You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/08/12 14:46:59 UTC

camel git commit: CAMEL-7425: Add EventDrivenPollingConsumer option to specify timeout

Repository: camel
Updated Branches:
  refs/heads/master aabb88c42 -> dd01ea747


CAMEL-7425: Add EventDrivenPollingConsumer option to specify timeout


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dd01ea74
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dd01ea74
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dd01ea74

Branch: refs/heads/master
Commit: dd01ea7474a591df100dcb94fabefd2c142192a8
Parents: aabb88c
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Aug 12 14:54:20 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Aug 12 14:54:20 2015 +0200

----------------------------------------------------------------------
 .../org/apache/camel/impl/DefaultEndpoint.java  | 27 +++++++++++++++++++-
 .../camel/impl/EventDrivenPollingConsumer.java  | 19 +++++++++++++-
 2 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dd01ea74/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
index f724e43..e4caf13 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
@@ -70,6 +70,7 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
     private Map<String, Object> consumerProperties;
     private int pollingConsumerQueueSize = 1000;
     private boolean pollingConsumerBlockWhenFull = true;
+    private long pollingConsumerBlockTimeout;
 
     /**
      * Constructs a fully-initialized DefaultEndpoint instance. This is the
@@ -222,9 +223,13 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
 
     public PollingConsumer createPollingConsumer() throws Exception {
         // should not call configurePollingConsumer when its EventDrivenPollingConsumer
-        LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} and blockWhenFull: {}", getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}",
+                    new Object[]{getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout()});
+        }
         EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize());
         consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull());
+        consumer.setBlockTimeout(getPollingConsumerBlockTimeout());
         return consumer;
     }
 
@@ -321,6 +326,26 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
         this.pollingConsumerBlockWhenFull = pollingConsumerBlockWhenFull;
     }
 
+    /**
+     * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+     * is being used.
+     *
+     * @see #setPollingConsumerBlockWhenFull(boolean)
+     */
+    public long getPollingConsumerBlockTimeout() {
+        return pollingConsumerBlockTimeout;
+    }
+
+    /**
+     * Sets the timeout in millis to use when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+     * is being used.
+     *
+     * @see #setPollingConsumerBlockWhenFull(boolean)
+     */
+    public void setPollingConsumerBlockTimeout(long pollingConsumerBlockTimeout) {
+        this.pollingConsumerBlockTimeout = pollingConsumerBlockTimeout;
+    }
+
     public void configureProperties(Map<String, Object> options) {
         Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer.");
         if (consumerProperties != null && !consumerProperties.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/dd01ea74/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index 764882e..624e9b2 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.IsSingleton;
 import org.apache.camel.PollingConsumerPollingStrategy;
 import org.apache.camel.Processor;
@@ -47,6 +48,7 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
     private ExceptionHandler interruptedExceptionHandler;
     private Consumer consumer;
     private boolean blockWhenFull = true;
+    private long blockTimeout;
     private final int queueCapacity;
 
     public EventDrivenPollingConsumer(Endpoint endpoint) {
@@ -79,6 +81,14 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
         this.blockWhenFull = blockWhenFull;
     }
 
+    public long getBlockTimeout() {
+        return blockTimeout;
+    }
+
+    public void setBlockTimeout(long blockTimeout) {
+        this.blockTimeout = blockTimeout;
+    }
+
     /**
      * Gets the queue capacity.
      */
@@ -139,7 +149,14 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
     public void process(Exchange exchange) throws Exception {
         if (isBlockWhenFull()) {
             try {
-                queue.put(exchange);
+                if (getBlockTimeout() <= 0) {
+                    queue.put(exchange);
+                } else {
+                    boolean added = queue.offer(exchange, getBlockTimeout(), TimeUnit.MILLISECONDS);
+                    if (!added) {
+                        throw new ExchangeTimedOutException(exchange, getBlockTimeout());
+                    }
+                }
             } catch (InterruptedException e) {
                 // ignore
                 log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped());