You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by swwyatt <st...@sungard.com> on 2014/06/23 20:31:21 UTC

seda not blocking when full

Hi,

I'm am using a producer template to send to a seda endpoint and it's not
blocking when full. Is there something wrong with my configuration? Using
v2.12.1.

	<bean id="fileProcessor" class="FileProcessor">
		<property name="workQueueEndpoint" ref="workQueue"></property>
		<property name="producerTemplate" ref="producerTemplate"></property>
	</bean>

	<bean id="dataProcessor" class="DataProcessor"></bean>

	<camel:camelContext id="test">

		<camel:template id="producerTemplate" />

		<camel:endpoint id="workQueue"
uri="seda:workQueue?size=2&amp;blockWhenFull=true&amp;concurrentConsumers=2&amp;timeout=0"
/>

		<camel:route>
			<camel:from uri="file://c:/dvw/stream?delay=6666"></camel:from>
			<camel:process ref="fileProcessor"></camel:process>
		</camel:route>

		<camel:route>
			<camel:from ref="workQueue"></camel:from>
			<camel:process ref="dataProcessor"></camel:process>
		</camel:route>

	</camel:camelContext>


public class FileProcessor implements Processor {
	
	private static final Logger logger = Logger.getLogger(FileProcessor.class);

	private Endpoint workQueueEndpoint;
	private ProducerTemplate producerTemplate;

	@Override
	public void process(Exchange exchange) throws Exception {
		FileReader fileReader = new
FileReader(exchange.getIn().getBody(File.class));
		BufferedReader bufferedReader = new BufferedReader(fileReader);
		Queue<Future&lt;Object>> futures = new LinkedList<Future&lt;Object>>();
		
		for (String line = bufferedReader.readLine(); line != null; line =
bufferedReader.readLine()) {
			logger.info("Sending " + line);
		
futures.add(this.getProducerTemplate().asyncRequestBody(this.getWorkQueueEndpoint(),
line));
			logger.info("Sent " + line);
		}
		
		while (futures.size() > 0) {
			Object o = futures.poll().get();
			logger.info("Result for " + o);
		}
		
		bufferedReader.close();
	}

	public Endpoint getWorkQueueEndpoint() {
		return workQueueEndpoint;
	}

	public void setWorkQueueEndpoint(Endpoint workQueueEndpoint) {
		this.workQueueEndpoint = workQueueEndpoint;
	}

	public ProducerTemplate getProducerTemplate() {
		return producerTemplate;
	}

	public void setProducerTemplate(ProducerTemplate producerTemplate) {
		this.producerTemplate = producerTemplate;
	}

}


public class DataProcessor implements Processor {

	private static final Logger logger = Logger.getLogger(DataProcessor.class);

	@Override
	public void process(Exchange exchange) throws Exception {
		Integer time = exchange.getIn().getBody(Integer.class);
		
		logger.info("Processing data " + time);
		Thread.sleep(time.longValue());
		logger.info("Completed processing data " + time);
	}

}

2014-06-23 13:17:38,676|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sending 2000
2014-06-23 13:17:38,679|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sent 2000
2014-06-23 13:17:38,679|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sending 3000
2014-06-23 13:17:38,679|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sent 3000
2014-06-23 13:17:38,679|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sending 2500
2014-06-23 13:17:38,680|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sent 2500
2014-06-23 13:17:38,680|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sending 1000
2014-06-23 13:17:38,680|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sent 1000
2014-06-23 13:17:38,681|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sending 4000
2014-06-23 13:17:38,681|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sent 4000
2014-06-23 13:17:38,681|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sending 3500
2014-06-23 13:17:38,682|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sent 3500
2014-06-23 13:17:38,682|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sending 1500
2014-06-23 13:17:38,682|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Sent 1500
2014-06-23 13:17:38,691|Camel (test) thread #1 - seda://workQueue INFO 
DataProcessor | process | Processing data 4000
2014-06-23 13:17:38,691|Camel (test) thread #2 - seda://workQueue INFO 
DataProcessor | process | Processing data 2500
2014-06-23 13:17:41,192|Camel (test) thread #2 - seda://workQueue INFO 
DataProcessor | process | Completed processing data 2500
2014-06-23 13:17:41,193|Camel (test) thread #2 - seda://workQueue INFO 
DataProcessor | process | Processing data 3500
2014-06-23 13:17:42,692|Camel (test) thread #1 - seda://workQueue INFO 
DataProcessor | process | Completed processing data 4000
2014-06-23 13:17:42,693|Camel (test) thread #1 - seda://workQueue INFO 
DataProcessor | process | Processing data 1500
2014-06-23 13:17:44,193|Camel (test) thread #1 - seda://workQueue INFO 
DataProcessor | process | Completed processing data 1500
2014-06-23 13:17:44,194|Camel (test) thread #1 - seda://workQueue INFO 
DataProcessor | process | Processing data 2000
2014-06-23 13:17:44,693|Camel (test) thread #2 - seda://workQueue INFO 
DataProcessor | process | Completed processing data 3500
2014-06-23 13:17:44,694|Camel (test) thread #2 - seda://workQueue INFO 
DataProcessor | process | Processing data 3000
2014-06-23 13:17:46,195|Camel (test) thread #1 - seda://workQueue INFO 
DataProcessor | process | Completed processing data 2000
2014-06-23 13:17:46,196|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Result for 2000
2014-06-23 13:17:46,196|Camel (test) thread #1 - seda://workQueue INFO 
DataProcessor | process | Processing data 1000
2014-06-23 13:17:47,197|Camel (test) thread #1 - seda://workQueue INFO 
DataProcessor | process | Completed processing data 1000
2014-06-23 13:17:47,695|Camel (test) thread #2 - seda://workQueue INFO 
DataProcessor | process | Completed processing data 3000
2014-06-23 13:17:47,695|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Result for 3000
2014-06-23 13:17:47,696|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Result for 2500
2014-06-23 13:17:47,696|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Result for 1000
2014-06-23 13:17:47,696|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Result for 4000
2014-06-23 13:17:47,697|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Result for 3500
2014-06-23 13:17:47,697|Camel (test) thread #0 - file://c:/dvw/stream INFO 
FileProcessor | process | Result for 1500




--
View this message in context: http://camel.465427.n5.nabble.com/seda-not-blocking-when-full-tp5752712.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: seda not blocking when full

Posted by swwyatt <st...@sungard.com>.
If I use the send API, and I'm assuming this is a producer template send
method, how do I determine when the exchange has completed the seda route
from the thread that calls send?



--
View this message in context: http://camel.465427.n5.nabble.com/seda-not-blocking-when-full-tp5752712p5752765.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: seda not blocking when full

Posted by Willem Jiang <wi...@gmail.com>.
You used the asyncSend, which means it cannot block the sending thread.
If you want producer is blocked, you need to use send API to send the message.

--  
Willem Jiang

Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem



On June 24, 2014 at 2:31:47 AM, swwyatt (steven.wyatt@sungard.com) wrote:
> Hi,
>  
> I'm am using a producer template to send to a seda endpoint and it's not
> blocking when full. Is there something wrong with my configuration? Using
> v2.12.1.
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
> > uri="seda:workQueue?size=2&blockWhenFull=true&concurrentConsumers=2&timeout=0"  
> />
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
> public class FileProcessor implements Processor {
>  
> private static final Logger logger = Logger.getLogger(FileProcessor.class);
>  
> private Endpoint workQueueEndpoint;
> private ProducerTemplate producerTemplate;
>  
> @Override
> public void process(Exchange exchange) throws Exception {
> FileReader fileReader = new
> FileReader(exchange.getIn().getBody(File.class));
> BufferedReader bufferedReader = new BufferedReader(fileReader);
> Queue> futures = new LinkedList>();
>  
> for (String line = bufferedReader.readLine(); line != null; line =
> bufferedReader.readLine()) {
> logger.info("Sending " + line);
>  
> futures.add(this.getProducerTemplate().asyncRequestBody(this.getWorkQueueEndpoint(),  
> line));
> logger.info("Sent " + line);
> }
>  
> while (futures.size() > 0) {
> Object o = futures.poll().get();
> logger.info("Result for " + o);
> }
>  
> bufferedReader.close();
> }
>  
> public Endpoint getWorkQueueEndpoint() {
> return workQueueEndpoint;
> }
>  
> public void setWorkQueueEndpoint(Endpoint workQueueEndpoint) {
> this.workQueueEndpoint = workQueueEndpoint;
> }
>  
> public ProducerTemplate getProducerTemplate() {
> return producerTemplate;
> }
>  
> public void setProducerTemplate(ProducerTemplate producerTemplate) {
> this.producerTemplate = producerTemplate;
> }
>  
> }
>  
>  
> public class DataProcessor implements Processor {
>  
> private static final Logger logger = Logger.getLogger(DataProcessor.class);
>  
> @Override
> public void process(Exchange exchange) throws Exception {
> Integer time = exchange.getIn().getBody(Integer.class);
>  
> logger.info("Processing data " + time);
> Thread.sleep(time.longValue());
> logger.info("Completed processing data " + time);
> }
>  
> }
>  
> 2014-06-23 13:17:38,676|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sending 2000
> 2014-06-23 13:17:38,679|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sent 2000
> 2014-06-23 13:17:38,679|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sending 3000
> 2014-06-23 13:17:38,679|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sent 3000
> 2014-06-23 13:17:38,679|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sending 2500
> 2014-06-23 13:17:38,680|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sent 2500
> 2014-06-23 13:17:38,680|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sending 1000
> 2014-06-23 13:17:38,680|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sent 1000
> 2014-06-23 13:17:38,681|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sending 4000
> 2014-06-23 13:17:38,681|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sent 4000
> 2014-06-23 13:17:38,681|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sending 3500
> 2014-06-23 13:17:38,682|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sent 3500
> 2014-06-23 13:17:38,682|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sending 1500
> 2014-06-23 13:17:38,682|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Sent 1500
> 2014-06-23 13:17:38,691|Camel (test) thread #1 - seda://workQueue INFO
> DataProcessor | process | Processing data 4000
> 2014-06-23 13:17:38,691|Camel (test) thread #2 - seda://workQueue INFO
> DataProcessor | process | Processing data 2500
> 2014-06-23 13:17:41,192|Camel (test) thread #2 - seda://workQueue INFO
> DataProcessor | process | Completed processing data 2500
> 2014-06-23 13:17:41,193|Camel (test) thread #2 - seda://workQueue INFO
> DataProcessor | process | Processing data 3500
> 2014-06-23 13:17:42,692|Camel (test) thread #1 - seda://workQueue INFO
> DataProcessor | process | Completed processing data 4000
> 2014-06-23 13:17:42,693|Camel (test) thread #1 - seda://workQueue INFO
> DataProcessor | process | Processing data 1500
> 2014-06-23 13:17:44,193|Camel (test) thread #1 - seda://workQueue INFO
> DataProcessor | process | Completed processing data 1500
> 2014-06-23 13:17:44,194|Camel (test) thread #1 - seda://workQueue INFO
> DataProcessor | process | Processing data 2000
> 2014-06-23 13:17:44,693|Camel (test) thread #2 - seda://workQueue INFO
> DataProcessor | process | Completed processing data 3500
> 2014-06-23 13:17:44,694|Camel (test) thread #2 - seda://workQueue INFO
> DataProcessor | process | Processing data 3000
> 2014-06-23 13:17:46,195|Camel (test) thread #1 - seda://workQueue INFO
> DataProcessor | process | Completed processing data 2000
> 2014-06-23 13:17:46,196|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Result for 2000
> 2014-06-23 13:17:46,196|Camel (test) thread #1 - seda://workQueue INFO
> DataProcessor | process | Processing data 1000
> 2014-06-23 13:17:47,197|Camel (test) thread #1 - seda://workQueue INFO
> DataProcessor | process | Completed processing data 1000
> 2014-06-23 13:17:47,695|Camel (test) thread #2 - seda://workQueue INFO
> DataProcessor | process | Completed processing data 3000
> 2014-06-23 13:17:47,695|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Result for 3000
> 2014-06-23 13:17:47,696|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Result for 2500
> 2014-06-23 13:17:47,696|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Result for 1000
> 2014-06-23 13:17:47,696|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Result for 4000
> 2014-06-23 13:17:47,697|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Result for 3500
> 2014-06-23 13:17:47,697|Camel (test) thread #0 - file://c:/dvw/stream INFO
> FileProcessor | process | Result for 1500
>  
>  
>  
>  
> --
> View this message in context: http://camel.465427.n5.nabble.com/seda-not-blocking-when-full-tp5752712.html  
> Sent from the Camel - Users mailing list archive at Nabble.com.
>