You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/02 15:15:01 UTC
svn commit: r959977 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ main/java/org/apache/camel/model/
main/java/org/apache/camel/processor/
main/java/org/apache/camel/processor/interceptor/
main/java/org/apache/camel/spi/ main/java/o...
Author: davsclaus
Date: Fri Jul 2 13:14:59 2010
New Revision: 959977
URL: http://svn.apache.org/viewvc?rev=959977&view=rev
Log:
CAMEL-2876: Delayer and throttle EIP supports non blocking delays using .asyncDelayed() option.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java
- copied, changed from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java
- copied, changed from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
camel/trunk/camel-core/src/test/resources/log4j.properties
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Fri Jul 2 13:14:59 2010
@@ -155,6 +155,29 @@ public class DefaultExecutorServiceStrat
return answer;
}
+ public ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef) {
+ ScheduledExecutorService answer = camelContext.getRegistry().lookup(executorServiceRef, ScheduledExecutorService.class);
+ if (answer != null && LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Looking up ScheduledExecutorService with ref: " + executorServiceRef + " and found it from Registry: " + answer);
+ }
+ }
+
+ if (answer == null) {
+ ThreadPoolProfile profile = getThreadPoolProfile(name);
+ if (profile != null) {
+ int poolSize = profile.getPoolSize();
+ answer = newScheduledThreadPool(source, name, poolSize);
+ if (answer != null && LOG.isDebugEnabled()) {
+ LOG.debug("Looking up ScheduledExecutorService with ref: " + executorServiceRef
+ + " and found a matching ThreadPoolProfile to create the ScheduledExecutorService: " + answer);
+ }
+ }
+ }
+
+ return answer;
+ }
+
public ExecutorService newDefaultThreadPool(Object source, String name) {
ThreadPoolProfile profile = getDefaultThreadPoolProfile();
ObjectHelper.notNull(profile, "DefaultThreadPoolProfile");
@@ -194,6 +217,11 @@ public class DefaultExecutorServiceStrat
return answer;
}
+ public ScheduledExecutorService newScheduledThreadPool(Object source, String name) {
+ int poolSize = getDefaultThreadPoolProfile().getPoolSize();
+ return newScheduledThreadPool(source, name, poolSize);
+ }
+
public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
ScheduledExecutorService answer = ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name, true);
onThreadPoolCreated(answer);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java Fri Jul 2 13:14:59 2010
@@ -16,9 +16,13 @@
*/
package org.apache.camel.model;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
@@ -27,6 +31,7 @@ import org.apache.camel.model.language.E
import org.apache.camel.processor.Delayer;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <delay/> element
@@ -35,7 +40,16 @@ import org.apache.camel.util.ObjectHelpe
*/
@XmlRootElement(name = "delay")
@XmlAccessorType(XmlAccessType.FIELD)
-public class DelayDefinition extends ExpressionNode {
+public class DelayDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<DelayDefinition> {
+
+ @XmlTransient
+ private ExecutorService executorService;
+ @XmlAttribute(required = false)
+ private String executorServiceRef;
+ @XmlAttribute
+ private Boolean asyncDelayed;
+ @XmlAttribute
+ private Boolean callerRunsWhenRejected = Boolean.TRUE;
public DelayDefinition() {
}
@@ -59,6 +73,39 @@ public class DelayDefinition extends Exp
return "Delay[" + getExpression() + " -> " + getOutputs() + "]";
}
+ @Override
+ public Processor createProcessor(RouteContext routeContext) throws Exception {
+ Processor childProcessor = this.createChildProcessor(routeContext, false);
+ Expression delay = createAbsoluteTimeDelayExpression(routeContext);
+
+ ScheduledExecutorService scheduled = null;
+ if (getAsyncDelayed() != null && getAsyncDelayed()) {
+ scheduled = ExecutorServiceHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", this);
+ if (scheduled == null) {
+ scheduled = routeContext.getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "Delay");
+ }
+ }
+
+ Delayer answer = new Delayer(childProcessor, delay, scheduled);
+ if (getAsyncDelayed() != null) {
+ answer.setAsyncDelayed(getAsyncDelayed());
+ }
+ if (getCallerRunsWhenRejected() != null) {
+ answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
+ }
+ return answer;
+ }
+
+ private Expression createAbsoluteTimeDelayExpression(RouteContext routeContext) {
+ ExpressionDefinition expr = getExpression();
+ if (expr != null) {
+ if (ObjectHelper.isNotEmpty(expr.getExpression()) || expr.getExpressionValue() != null) {
+ return expr.createExpression(routeContext);
+ }
+ }
+ return null;
+ }
+
// Fluent API
// -------------------------------------------------------------------------
@@ -72,21 +119,72 @@ public class DelayDefinition extends Exp
setExpression(new ExpressionDefinition(ExpressionBuilder.constantExpression(delay)));
return this;
}
-
- @Override
- public Processor createProcessor(RouteContext routeContext) throws Exception {
- Processor childProcessor = this.createChildProcessor(routeContext, false);
- Expression delay = createAbsoluteTimeDelayExpression(routeContext);
- return new Delayer(childProcessor, delay);
+
+ /**
+ * Whether or not the caller should run the task when it was rejected by the thread pool.
+ * <p/>
+ * Is by default <tt>true</tt>
+ *
+ * @param callerRunsWhenRejected whether or not the caller should run
+ * @return the builder
+ */
+ public DelayDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
+ setCallerRunsWhenRejected(callerRunsWhenRejected);
+ return this;
}
- private Expression createAbsoluteTimeDelayExpression(RouteContext routeContext) {
- ExpressionDefinition expr = getExpression();
- if (expr != null) {
- if (ObjectHelper.isNotEmpty(expr.getExpression()) || expr.getExpressionValue() != null) {
- return expr.createExpression(routeContext);
- }
- }
- return null;
+ /**
+ * Enables asynchronous delay which means the thread will <b>noy</b> block while delaying.
+ *
+ * @return the builder
+ */
+ public DelayDefinition asyncDelayed() {
+ setAsyncDelayed(true);
+ return this;
+ }
+
+ public DelayDefinition executorService(ExecutorService executorService) {
+ setExecutorService(executorService);
+ return this;
+ }
+
+ public DelayDefinition executorServiceRef(String executorServiceRef) {
+ setExecutorServiceRef(executorServiceRef);
+ return this;
+ }
+
+ // Properties
+ // -------------------------------------------------------------------------
+
+ public Boolean getAsyncDelayed() {
+ return asyncDelayed;
+ }
+
+ public void setAsyncDelayed(Boolean asyncDelayed) {
+ this.asyncDelayed = asyncDelayed;
+ }
+
+ public Boolean getCallerRunsWhenRejected() {
+ return callerRunsWhenRejected;
+ }
+
+ public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
+ this.callerRunsWhenRejected = callerRunsWhenRejected;
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public String getExecutorServiceRef() {
+ return executorServiceRef;
+ }
+
+ public void setExecutorServiceRef(String executorServiceRef) {
+ this.executorServiceRef = executorServiceRef;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java Fri Jul 2 13:14:59 2010
@@ -16,16 +16,18 @@
*/
package org.apache.camel.model;
-import java.util.List;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.Processor;
import org.apache.camel.processor.Throttler;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <throttle/> element
@@ -34,11 +36,19 @@ import org.apache.camel.spi.RouteContext
*/
@XmlRootElement(name = "throttle")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ThrottleDefinition extends OutputDefinition<ThrottleDefinition> {
+public class ThrottleDefinition extends OutputDefinition<ThrottleDefinition> implements ExecutorServiceAwareDefinition<ThrottleDefinition> {
+ @XmlTransient
+ private ExecutorService executorService;
+ @XmlAttribute(required = false)
+ private String executorServiceRef;
@XmlAttribute
private Long maximumRequestsPerPeriod;
@XmlAttribute
private long timePeriodMillis = 1000;
+ @XmlAttribute
+ private Boolean asyncDelayed;
+ @XmlAttribute
+ private Boolean callerRunsWhenRejected = Boolean.TRUE;
public ThrottleDefinition() {
}
@@ -66,7 +76,23 @@ public class ThrottleDefinition extends
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
Processor childProcessor = this.createChildProcessor(routeContext, true);
- return new Throttler(childProcessor, maximumRequestsPerPeriod, timePeriodMillis);
+
+ ScheduledExecutorService scheduled = null;
+ if (getAsyncDelayed() != null && getAsyncDelayed()) {
+ scheduled = ExecutorServiceHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this);
+ if (scheduled == null) {
+ scheduled = routeContext.getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "Throttle");
+ }
+ }
+
+ Throttler answer = new Throttler(childProcessor, maximumRequestsPerPeriod, timePeriodMillis, scheduled);
+ if (getAsyncDelayed() != null) {
+ answer.setAsyncDelayed(getAsyncDelayed());
+ }
+ if (getCallerRunsWhenRejected() != null) {
+ answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
+ }
+ return answer;
}
// Fluent API
@@ -94,6 +120,39 @@ public class ThrottleDefinition extends
return this;
}
+ /**
+ * Whether or not the caller should run the task when it was rejected by the thread pool.
+ * <p/>
+ * Is by default <tt>true</tt>
+ *
+ * @param callerRunsWhenRejected whether or not the caller should run
+ * @return the builder
+ */
+ public ThrottleDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
+ setCallerRunsWhenRejected(callerRunsWhenRejected);
+ return this;
+ }
+
+ /**
+ * Enables asynchronous delay which means the thread will <b>noy</b> block while delaying.
+ *
+ * @return the builder
+ */
+ public ThrottleDefinition asyncDelayed() {
+ setAsyncDelayed(true);
+ return this;
+ }
+
+ public ThrottleDefinition executorService(ExecutorService executorService) {
+ setExecutorService(executorService);
+ return this;
+ }
+
+ public ThrottleDefinition executorServiceRef(String executorServiceRef) {
+ setExecutorServiceRef(executorServiceRef);
+ return this;
+ }
+
// Properties
// -------------------------------------------------------------------------
@@ -113,4 +172,35 @@ public class ThrottleDefinition extends
this.timePeriodMillis = timePeriodMillis;
}
+ public Boolean getAsyncDelayed() {
+ return asyncDelayed;
+ }
+
+ public void setAsyncDelayed(Boolean asyncDelayed) {
+ this.asyncDelayed = asyncDelayed;
+ }
+
+ public Boolean getCallerRunsWhenRejected() {
+ return callerRunsWhenRejected;
+ }
+
+ public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
+ this.callerRunsWhenRejected = callerRunsWhenRejected;
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public String getExecutorServiceRef() {
+ return executorServiceRef;
+ }
+
+ public void setExecutorServiceRef(String executorServiceRef) {
+ this.executorServiceRef = executorServiceRef;
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Fri Jul 2 13:14:59 2010
@@ -16,13 +16,14 @@
*/
package org.apache.camel.processor;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.camel.AlreadyStoppedException;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,83 +37,140 @@ import org.apache.commons.logging.LogFac
*/
public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
protected final transient Log log = LogFactory.getLog(getClass());
- private final CountDownLatch stoppedLatch = new CountDownLatch(1);
- private boolean fastStop = true;
+ private final ScheduledExecutorService executorService;
+ private boolean asyncDelayed;
+ private boolean callerRunsWhenRejected = true;
+
+ // TODO: Add option to cancel tasks on shutdown so we can stop fast
+
+ private final class ProcessCall implements Runnable {
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+
+ public ProcessCall(Exchange exchange, AsyncCallback callback) {
+ this.exchange = exchange;
+ this.callback = callback;
+ }
+
+ public void run() {
+ if (log.isTraceEnabled()) {
+ log.trace("Delayed task woke up and continues routing for exchangeId: " + exchange.getExchangeId());
+ }
+ if (!isRunAllowed()) {
+ exchange.setException(new RejectedExecutionException("Run is not allowed"));
+ }
+ DelayProcessorSupport.super.process(exchange, callback);
+ // signal callback we are done async
+ callback.done(false);
+ }
+ }
public DelayProcessorSupport(Processor processor) {
+ this(processor, null);
+ }
+
+ public DelayProcessorSupport(Processor processor, ScheduledExecutorService executorService) {
super(processor);
+ this.executorService = executorService;
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
- try {
- delay(exchange);
- } catch (Exception e) {
- // exception occurred so we are done
- exchange.setException(e);
+ if (!isRunAllowed()) {
+ exchange.setException(new RejectedExecutionException("Run is not allowed"));
callback.done(true);
return true;
}
- return super.process(exchange, callback);
+
+ // calculate delay and wait
+ long delay = calculateDelay(exchange);
+ if (delay <= 0) {
+ // no delay then continue routing
+ return super.process(exchange, callback);
+ }
+
+ if (!isAsyncDelayed() || exchange.isTransacted()) {
+ // use synchronous delay (also required if using transactions)
+ try {
+ delay(delay, exchange);
+ // then continue routing
+ return super.process(exchange, callback);
+ } catch (Exception e) {
+ // exception occurred so we are done
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+ } else {
+ // asynchronous delay so schedule a process call task
+ ProcessCall call = new ProcessCall(exchange, callback);
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("Scheduling delayed task to run in " + delay + " millis for exchangeId: " + exchange.getExchangeId());
+ }
+ executorService.schedule(call, delay, TimeUnit.MILLISECONDS);
+ // tell Camel routing engine we continue routing asynchronous
+ return false;
+ } catch (RejectedExecutionException e) {
+ if (isCallerRunsWhenRejected()) {
+ if (!isRunAllowed()) {
+ exchange.setException(new RejectedExecutionException());
+ } else {
+ // let caller run by processing
+ delay(delay, exchange);
+ // then continue routing
+ return super.process(exchange, callback);
+ }
+ } else {
+ exchange.setException(e);
+ }
+ // caller don't run the task so we are done
+ callback.done(true);
+ return true;
+ }
+ }
}
- public boolean isFastStop() {
- return fastStop;
+ public boolean isAsyncDelayed() {
+ return asyncDelayed;
}
- /**
- * Enables & disables a fast stop; basically to avoid waiting a possibly
- * long time for delays to complete before the context shuts down; instead
- * the current processing method throws
- * {@link org.apache.camel.AlreadyStoppedException} to terminate processing.
- */
- public void setFastStop(boolean fastStop) {
- this.fastStop = fastStop;
+ public void setAsyncDelayed(boolean asyncDelayed) {
+ this.asyncDelayed = asyncDelayed;
+ }
+
+ public boolean isCallerRunsWhenRejected() {
+ return callerRunsWhenRejected;
}
- protected void doStop() throws Exception {
- stoppedLatch.countDown();
- super.doStop();
+ public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
+ this.callerRunsWhenRejected = callerRunsWhenRejected;
}
- protected abstract void delay(Exchange exchange) throws Exception;
+ protected abstract long calculateDelay(Exchange exchange);
/**
- * Wait until the given system time before continuing
+ * Delays the given time before continuing.
+ * <p/>
+ * This implementation will block while waiting
*
- * @param time the system time to wait for
+ * @param delay the delay time in millis
* @param exchange the exchange being processed
*/
- protected void waitUntil(long time, Exchange exchange) throws Exception {
+ protected void delay(long delay, Exchange exchange) {
// only run is we are started
- while (isRunAllowed()) {
- long delay = time - currentSystemTime();
- if (delay < 0) {
- return;
- } else {
- if (isFastStop() && !isRunAllowed()) {
- throw new AlreadyStoppedException();
- }
- try {
- sleep(delay);
- } catch (InterruptedException e) {
- handleSleepInterruptedException(e);
- }
- }
+ if (!isRunAllowed()) {
+ return;
}
- }
- protected void sleep(long delay) throws InterruptedException {
- if (delay <= 0) {
+ if (delay < 0) {
return;
- }
- if (log.isTraceEnabled()) {
- log.trace("Sleeping for: " + delay + " millis");
- }
- if (isFastStop()) {
- stoppedLatch.await(delay, TimeUnit.MILLISECONDS);
} else {
- Thread.sleep(delay);
+ try {
+ sleep(delay);
+ } catch (InterruptedException e) {
+ handleSleepInterruptedException(e);
+ }
}
}
@@ -129,4 +187,22 @@ public abstract class DelayProcessorSupp
protected long currentSystemTime() {
return System.currentTimeMillis();
}
+
+ private void sleep(long delay) throws InterruptedException {
+ if (delay <= 0) {
+ return;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Sleeping for: " + delay + " millis");
+ }
+ Thread.sleep(delay);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (isAsyncDelayed()) {
+ ObjectHelper.notNull(executorService, "executorService", this);
+ }
+ super.doStart();
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java Fri Jul 2 13:14:59 2010
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
@@ -33,8 +35,8 @@ public class Delayer extends DelayProces
private Expression delay;
private long delayValue;
- public Delayer(Processor processor, Expression delay) {
- super(processor);
+ public Delayer(Processor processor, Expression delay, ScheduledExecutorService executorService) {
+ super(processor, executorService);
this.delay = delay;
}
@@ -62,11 +64,7 @@ public class Delayer extends DelayProces
// Implementation methods
// -------------------------------------------------------------------------
- /**
- * Waits for an optional time period before continuing to process the
- * exchange
- */
- protected void delay(Exchange exchange) throws Exception {
+ protected long calculateDelay(Exchange exchange) {
long time = 0;
if (delay != null) {
Long longValue = delay.evaluate(exchange, Long.class);
@@ -79,21 +77,10 @@ public class Delayer extends DelayProces
}
if (time <= 0) {
// no delay
- return;
+ return 0;
}
- // now add the current time
- time += defaultProcessTime(exchange);
-
- waitUntil(time, exchange);
- }
-
- /**
- * A Strategy Method to allow derived implementations to decide the current
- * system time or some other default exchange property
- */
- protected long defaultProcessTime(Exchange exchange) {
- return currentSystemTime();
+ return time;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Fri Jul 2 13:14:59 2010
@@ -77,6 +77,7 @@ public class ThreadsProcessor extends Se
ProcessCall call = new ProcessCall(exchange, callback);
try {
executorService.submit(call);
+ // tell Camel routing engine we continue routing asynchronous
return false;
} catch (RejectedExecutionException e) {
if (isCallerRunsWhenRejected()) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Fri Jul 2 13:14:59 2010
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -33,14 +35,14 @@ import org.apache.camel.Processor;
public class Throttler extends DelayProcessorSupport implements Traceable {
private long maximumRequestsPerPeriod;
private long timePeriodMillis;
- private TimeSlot slot;
+ private volatile TimeSlot slot;
public Throttler(Processor processor, long maximumRequestsPerPeriod) {
- this(processor, maximumRequestsPerPeriod, 1000);
+ this(processor, maximumRequestsPerPeriod, 1000, null);
}
- public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis) {
- super(processor);
+ public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis, ScheduledExecutorService executorService) {
+ super(processor, executorService);
this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
this.timePeriodMillis = timePeriodMillis;
}
@@ -81,10 +83,14 @@ public class Throttler extends DelayProc
// Implementation methods
// -----------------------------------------------------------------------
- protected void delay(Exchange exchange) throws Exception {
+
+ protected long calculateDelay(Exchange exchange) {
TimeSlot slot = nextSlot();
if (!slot.isActive()) {
- waitUntil(slot.startTime, exchange);
+ long delay = slot.startTime - currentSystemTime();
+ return delay;
+ } else {
+ return 0;
}
}
@@ -107,7 +113,7 @@ public class Throttler extends DelayProc
*/
protected class TimeSlot {
- private long capacity = Throttler.this.maximumRequestsPerPeriod;
+ private volatile long capacity = Throttler.this.maximumRequestsPerPeriod;
private final long duration = Throttler.this.timePeriodMillis;
private final long startTime;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java Fri Jul 2 13:14:59 2010
@@ -40,10 +40,11 @@ public class DelayInterceptor extends De
return "DelayInterceptor[delay: " + delayer.getDelay() + " on: " + node + "]";
}
- public void delay(Exchange exchange) throws Exception {
+ public long calculateDelay(Exchange exchange) {
if (delayer.isEnabled()) {
- long time = currentSystemTime() + delayer.getDelay();
- waitUntil(time, exchange);
+ return delayer.getDelay();
+ } else {
+ return 0;
}
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Fri Jul 2 13:14:59 2010
@@ -112,6 +112,17 @@ public interface ExecutorServiceStrategy
ExecutorService lookup(Object source, String name, String executorServiceRef);
/**
+ * Lookup a {@link java.util.concurrent.ScheduledExecutorService} from the {@link org.apache.camel.spi.Registry}
+ * and from known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @param executorServiceRef reference to lookup
+ * @return the {@link java.util.concurrent.ScheduledExecutorService} or <tt>null</tt> if not found
+ */
+ ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef);
+
+ /**
* Creates a new thread pool using the default thread pool profile.
*
* @param source the source object, usually it should be <tt>this</tt> passed in as parameter
@@ -141,6 +152,8 @@ public interface ExecutorServiceStrategy
/**
* Creates a new scheduled thread pool.
+ * <p/>
+ * Will use the pool size from the default thread pool profile
*
* @param source the source object, usually it should be <tt>this</tt> passed in as parameter
* @param name name which is appended to the thread name
@@ -150,6 +163,15 @@ public interface ExecutorServiceStrategy
ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize);
/**
+ * Creates a new scheduled thread pool.
+ *
+ * @param source the source object, usually it should be <tt>this</tt> passed in as parameter
+ * @param name name which is appended to the thread name
+ * @return the created thread pool
+ */
+ ScheduledExecutorService newScheduledThreadPool(Object source, String name);
+
+ /**
* Creates a new fixed thread pool.
*
* @param source the source object, usually it should be <tt>this</tt> passed in as parameter
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java Fri Jul 2 13:14:59 2010
@@ -1256,7 +1256,7 @@ public final class ObjectHelper {
return "0x" + Integer.toHexString(System.identityHashCode(object));
}
- private static class ExceptionIterator implements Iterator<Throwable> {
+ private static final class ExceptionIterator implements Iterator<Throwable> {
private List<Throwable> tree = new ArrayList<Throwable>();
private Iterator<Throwable> it;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Jul 2 13:14:59 2010
@@ -261,4 +261,48 @@ public final class ExecutorServiceHelper
return null;
}
+ /**
+ * Will lookup and get the configured {@link java.util.concurrent.ScheduledExecutorService} from the given definition.
+ * <p/>
+ * This method will lookup for configured thread pool in the following order
+ * <ul>
+ * <li>from the definition if any explicit configured executor service.</li>
+ * <li>from the {@link org.apache.camel.spi.Registry} if found</li>
+ * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
+ * <li>if none found, then <tt>null</tt> is returned.</li>
+ * </ul>
+ * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support
+ * configured executor services in the same coherent way.
+ *
+ * @param routeContext the rout context
+ * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
+ * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
+ * @param definition the node definition which may leverage executor service.
+ * @return the configured executor service, or <tt>null</tt> if none was configured.
+ * @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
+ * or the found instance is not a ScheduledExecutorService type.
+ */
+ public static ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext, String name,
+ ExecutorServiceAwareDefinition definition) throws IllegalArgumentException {
+ ExecutorServiceStrategy strategy = routeContext.getCamelContext().getExecutorServiceStrategy();
+ ObjectHelper.notNull(strategy, "ExecutorServiceStrategy", routeContext.getCamelContext());
+
+ // prefer to use explicit configured executor on the definition
+ if (definition.getExecutorService() != null) {
+ ExecutorService executorService = definition.getExecutorService();
+ if (executorService instanceof ScheduledExecutorService) {
+ return (ScheduledExecutorService) executorService;
+ }
+ throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance");
+ } else if (definition.getExecutorServiceRef() != null) {
+ ScheduledExecutorService answer = strategy.lookupScheduled(definition, name, definition.getExecutorServiceRef());
+ if (answer == null) {
+ throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry.");
+ }
+ return answer;
+ }
+
+ return null;
+ }
+
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java (from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java&r1=959949&r2=959977&rev=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java Fri Jul 2 13:14:59 2010
@@ -23,7 +23,7 @@ import org.apache.camel.component.mock.M
/**
* @version $Revision$
*/
-public class DelayerTest extends ContextTestSupport {
+public class DelayerAsyncDelayedTest extends ContextTestSupport {
public void testSendingMessageGetsDelayed() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
@@ -54,13 +54,13 @@ public class DelayerTest extends Context
return new RouteBuilder() {
public void configure() {
// START SNIPPET: ex
- from("seda:a").delay().header("MyDelay").to("mock:result");
+ from("seda:a").delay().header("MyDelay").asyncDelayed().to("mock:result");
// END SNIPPET: ex
// START SNIPPET: ex2
- from("seda:b").delay(1000).to("mock:result");
+ from("seda:b").delay(1000).asyncDelayed().to("mock:result");
// END SNIPPET: ex2
}
};
}
-}
+}
\ No newline at end of file
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java (from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java&r1=959949&r2=959977&rev=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java Fri Jul 2 13:14:59 2010
@@ -22,14 +22,13 @@ import java.util.concurrent.Executors;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.Throttler.TimeSlot;
/**
* @version $Revision$
*/
-public class ThrottlerTest extends ContextTestSupport {
+public class ThrottlerAsyncDelayedTest extends ContextTestSupport {
private static final int INTERVAL = 500;
- protected int messageCount = 6;
+ protected int messageCount = 9;
public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
@@ -44,7 +43,7 @@ public class ThrottlerTest extends Conte
// to check that the throttle really does kick in
resultEndpoint.assertIsSatisfied();
}
-
+
public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
long start = System.currentTimeMillis();
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
@@ -55,41 +54,26 @@ public class ThrottlerTest extends Conte
executor.execute(new Runnable() {
public void run() {
template.sendBody("direct:a", "<message>payload</message>");
- }
+ }
});
}
-
+
// let's wait for the exchanges to arrive
resultEndpoint.assertIsSatisfied();
-
+
// now assert that they have actually been throttled
long minimumTime = (messageCount - 1) * INTERVAL;
assertTrue("Should take at least " + minimumTime + "ms", System.currentTimeMillis() - start >= minimumTime);
}
-
- public void testTimeSlotCalculus() throws Exception {
- Throttler throttler = new Throttler(null, 2, 1000);
- TimeSlot slot = throttler.nextSlot();
- // start a new time slot
- assertNotNull(slot);
- // make sure the same slot is used (2 exchanges per slot)
- assertSame(slot, throttler.nextSlot());
- assertTrue(slot.isFull());
-
- TimeSlot next = throttler.nextSlot();
- // now we should have a new slot that starts somewhere in the future
- assertNotSame(slot, next);
- assertFalse(next.isActive());
- }
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
// START SNIPPET: ex
- from("seda:a").throttle(3).timePeriodMillis(10000).to("mock:result");
+ from("seda:a").throttle(3).timePeriodMillis(10000).asyncDelayed().to("log:result", "mock:result");
// END SNIPPET: ex
-
- from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("mock:result");
+
+ from("direct:a").throttle(1).timePeriodMillis(INTERVAL).asyncDelayed().to("log:result", "mock:result");
}
};
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Fri Jul 2 13:14:59 2010
@@ -29,7 +29,7 @@ import org.apache.camel.processor.Thrott
*/
public class ThrottlerTest extends ContextTestSupport {
private static final int INTERVAL = 500;
- protected int messageCount = 6;
+ protected int messageCount = 9;
public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
@@ -68,7 +68,7 @@ public class ThrottlerTest extends Conte
}
public void testTimeSlotCalculus() throws Exception {
- Throttler throttler = new Throttler(null, 2, 1000);
+ Throttler throttler = new Throttler(null, 2, 1000, null);
TimeSlot slot = throttler.nextSlot();
// start a new time slot
assertNotNull(slot);
@@ -86,10 +86,10 @@ public class ThrottlerTest extends Conte
return new RouteBuilder() {
public void configure() {
// START SNIPPET: ex
- from("seda:a").throttle(3).timePeriodMillis(10000).to("mock:result");
+ from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result");
// END SNIPPET: ex
- from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("mock:result");
+ from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("log:result", "mock:result");
}
};
}
Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=959977&r1=959976&r2=959977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Fri Jul 2 13:14:59 2010
@@ -34,6 +34,8 @@ log4j.logger.org.apache.activemq.spring=
#log4j.logger.org.apache.camel.processor.RoutingSlip=TRACE
#log4j.logger.org.apache.camel.processor.TryProcessor=TRACE
#log4j.logger.org.apache.camel.processor.loadbalancer=TRACE
+#log4j.logger.org.apache.camel.processor.Delayer=TRACE
+#log4j.logger.org.apache.camel.processor.Throttler=TRACE
log4j.logger.org.apache.camel.impl.converter=WARN
log4j.logger.org.apache.camel.management=WARN
log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN