You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/08/12 14:46:59 UTC
camel git commit: CAMEL-7425: Add EventDrivenPollingConsumer option
to specify timeout
Repository: camel
Updated Branches:
refs/heads/master aabb88c42 -> dd01ea747
CAMEL-7425: Add EventDrivenPollingConsumer option to specify timeout
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dd01ea74
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dd01ea74
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dd01ea74
Branch: refs/heads/master
Commit: dd01ea7474a591df100dcb94fabefd2c142192a8
Parents: aabb88c
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Aug 12 14:54:20 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Aug 12 14:54:20 2015 +0200
----------------------------------------------------------------------
.../org/apache/camel/impl/DefaultEndpoint.java | 27 +++++++++++++++++++-
.../camel/impl/EventDrivenPollingConsumer.java | 19 +++++++++++++-
2 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/dd01ea74/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
index f724e43..e4caf13 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
@@ -70,6 +70,7 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
private Map<String, Object> consumerProperties;
private int pollingConsumerQueueSize = 1000;
private boolean pollingConsumerBlockWhenFull = true;
+ private long pollingConsumerBlockTimeout;
/**
* Constructs a fully-initialized DefaultEndpoint instance. This is the
@@ -222,9 +223,13 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
public PollingConsumer createPollingConsumer() throws Exception {
// should not call configurePollingConsumer when its EventDrivenPollingConsumer
- LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} and blockWhenFull: {}", getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}",
+ new Object[]{getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout()});
+ }
EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize());
consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull());
+ consumer.setBlockTimeout(getPollingConsumerBlockTimeout());
return consumer;
}
@@ -321,6 +326,26 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
this.pollingConsumerBlockWhenFull = pollingConsumerBlockWhenFull;
}
+ /**
+ * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+ * is being used.
+ *
+ * @see #setPollingConsumerBlockWhenFull(boolean)
+ */
+ public long getPollingConsumerBlockTimeout() {
+ return pollingConsumerBlockTimeout;
+ }
+
+ /**
+ * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+ * is being used.
+ *
+ * @see #setPollingConsumerBlockWhenFull(boolean)
+ */
+ public void setPollingConsumerBlockTimeout(long pollingConsumerBlockTimeout) {
+ this.pollingConsumerBlockTimeout = pollingConsumerBlockTimeout;
+ }
+
public void configureProperties(Map<String, Object> options) {
Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer.");
if (consumerProperties != null && !consumerProperties.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/dd01ea74/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index 764882e..624e9b2 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.IsSingleton;
import org.apache.camel.PollingConsumerPollingStrategy;
import org.apache.camel.Processor;
@@ -47,6 +48,7 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
private ExceptionHandler interruptedExceptionHandler;
private Consumer consumer;
private boolean blockWhenFull = true;
+ private long blockTimeout;
private final int queueCapacity;
public EventDrivenPollingConsumer(Endpoint endpoint) {
@@ -79,6 +81,14 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
this.blockWhenFull = blockWhenFull;
}
+ public long getBlockTimeout() {
+ return blockTimeout;
+ }
+
+ public void setBlockTimeout(long blockTimeout) {
+ this.blockTimeout = blockTimeout;
+ }
+
/**
* Gets the queue capacity.
*/
@@ -139,7 +149,14 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
public void process(Exchange exchange) throws Exception {
if (isBlockWhenFull()) {
try {
- queue.put(exchange);
+ if (getBlockTimeout() <= 0) {
+ queue.put(exchange);
+ } else {
+ boolean added = queue.offer(exchange, getBlockTimeout(), TimeUnit.MILLISECONDS);
+ if (!added) {
+ throw new ExchangeTimedOutException(exchange, getBlockTimeout());
+ }
+ }
} catch (InterruptedException e) {
// ignore
log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped());