You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/05/06 09:56:12 UTC

git commit: CAMEL-7411: EventDrivenPollingConsumer can lose exchanges when the internal queue is full

Repository: camel
Updated Branches:
  refs/heads/camel-2.13.x 46e52e753 -> a032bc143


CAMEL-7411: EventDrivenPollingConsumer can lose exchanges when the internal queue is full


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a032bc14
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a032bc14
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a032bc14

Branch: refs/heads/camel-2.13.x
Commit: a032bc14305ab511aa286012c9c5773f6031a738
Parents: 46e52e7
Author: Claus Ibsen <da...@apache.org>
Authored: Tue May 6 09:55:38 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue May 6 09:55:58 2014 +0200

----------------------------------------------------------------------
 .../org/apache/camel/impl/DefaultEndpoint.java  |  62 +++++++-
 .../camel/impl/EventDrivenPollingConsumer.java  |  51 ++++++-
 ...EventDrivenPollingConsumerQueueSizeTest.java | 142 +++++++++++++++++++
 3 files changed, 251 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a032bc14/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
index badd48e..2a6c5b0 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
@@ -36,6 +36,8 @@ import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A default endpoint useful for implementation inheritance.
@@ -52,6 +54,7 @@ import org.apache.camel.util.URISupport;
  */
 public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware {
 
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultEndpoint.class);
     private String endpointUri;
     private EndpointConfiguration endpointConfiguration;
     private CamelContext camelContext;
@@ -64,6 +67,8 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
     private boolean synchronous;
     private final String id = EndpointHelper.createEndpointId();
     private Map<String, Object> consumerProperties;
+    private int pollingConsumerQueueSize = 1000;
+    private boolean pollingConsumerBlockWhenFull = true;
 
     /**
      * Constructs a fully-initialized DefaultEndpoint instance. This is the
@@ -215,8 +220,11 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
     }
 
     public PollingConsumer createPollingConsumer() throws Exception {
-        // should not configure consumer
-        return new EventDrivenPollingConsumer(this);
+        // should not call configurePollingConsumer when its EventDrivenPollingConsumer
+        LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} and blockWhenFull: {}", getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull());
+        EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize());
+        consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull());
+        return consumer;
     }
 
     public Exchange createExchange(Exchange exchange) {
@@ -267,6 +275,56 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
         this.synchronous = synchronous;
     }
 
+    /**
+     * Gets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
+     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
+     * <p/>
+     * The default value is <tt>1000</tt>
+     */
+    public int getPollingConsumerQueueSize() {
+        return pollingConsumerQueueSize;
+    }
+
+    /**
+     * Sets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
+     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
+     * <p/>
+     * The default value is <tt>1000</tt>
+     */
+    public void setPollingConsumerQueueSize(int pollingConsumerQueueSize) {
+        this.pollingConsumerQueueSize = pollingConsumerQueueSize;
+    }
+
+    /**
+     * Whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
+     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
+     * <p/>
+     * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown
+     * when trying to add to the queue, and its full.
+     * <p/>
+     * The default value is <tt>true</tt> which will block the producer queue until the queue has space.
+     */
+    public boolean isPollingConsumerBlockWhenFull() {
+        return pollingConsumerBlockWhenFull;
+    }
+
+    /**
+     * Set whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+     * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
+     * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
+     * <p/>
+     * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown
+     * when trying to add to the queue, and its full.
+     * <p/>
+     * The default value is <tt>true</tt> which will block the producer queue until the queue has space.
+     */
+    public void setPollingConsumerBlockWhenFull(boolean pollingConsumerBlockWhenFull) {
+        this.pollingConsumerBlockWhenFull = pollingConsumerBlockWhenFull;
+    }
+
     public void configureProperties(Map<String, Object> options) {
         Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer.");
         if (consumerProperties != null && !consumerProperties.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/a032bc14/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index 07bcf07..428610e 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.impl;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -43,17 +44,53 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
     private final BlockingQueue<Exchange> queue;
     private ExceptionHandler interruptedExceptionHandler;
     private Consumer consumer;
+    private boolean blockWhenFull = true;
+    private final int queueCapacity;
 
     public EventDrivenPollingConsumer(Endpoint endpoint) {
-        this(endpoint, new ArrayBlockingQueue<Exchange>(1000));
+        this(endpoint, 1000);
+    }
+
+    public EventDrivenPollingConsumer(Endpoint endpoint, int queueSize) {
+        super(endpoint);
+        this.queueCapacity = queueSize;
+        if (queueSize <= 0) {
+            this.queue = new LinkedBlockingDeque<Exchange>();
+        } else {
+            this.queue = new ArrayBlockingQueue<Exchange>(queueSize);
+        }
+        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
     }
 
     public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue) {
         super(endpoint);
         this.queue = queue;
+        this.queueCapacity = queue.remainingCapacity();
         this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
     }
 
+    public boolean isBlockWhenFull() {
+        return blockWhenFull;
+    }
+
+    public void setBlockWhenFull(boolean blockWhenFull) {
+        this.blockWhenFull = blockWhenFull;
+    }
+
+    /**
+     * Gets the queue capacity.
+     */
+    public int getQueueCapacity() {
+        return queueCapacity;
+    }
+
+    /**
+     * Gets the current queue size (no of elements in the queue).
+     */
+    public int getQueueSize() {
+        return queue.size();
+    }
+
     public Exchange receiveNoWait() {
         return receive(0);
     }
@@ -98,7 +135,16 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
     }
 
     public void process(Exchange exchange) throws Exception {
-        queue.offer(exchange);
+        if (isBlockWhenFull()) {
+            try {
+                queue.put(exchange);
+            } catch (InterruptedException e) {
+                // ignore
+                log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped());
+            }
+        } else {
+            queue.add(exchange);
+        }
     }
 
     public ExceptionHandler getInterruptedExceptionHandler() {
@@ -155,5 +201,6 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
 
     protected void doShutdown() throws Exception {
         ServiceHelper.stopAndShutdownService(consumer);
+        queue.clear();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a032bc14/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java b/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java
new file mode 100644
index 0000000..06649a2
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Map;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+
+public class EventDrivenPollingConsumerQueueSizeTest extends ContextTestSupport {
+
+    private String uri = "my:foo?pollingConsumerQueueSize=10&pollingConsumerBlockWhenFull=false";
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        context.addComponent("my", new MyQueueComponent());
+    }
+
+    public void testQueueSize() throws Exception {
+        PollingConsumer consumer = context.getEndpoint(uri).createPollingConsumer();
+        consumer.start();
+
+        assertNotNull(consumer);
+        EventDrivenPollingConsumer edpc = assertIsInstanceOf(EventDrivenPollingConsumer.class, consumer);
+        assertEquals(0, edpc.getQueueSize());
+        assertEquals(10, edpc.getQueueCapacity());
+        assertFalse(edpc.isBlockWhenFull());
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody(uri, "Message " + i);
+        }
+
+        assertEquals(10, edpc.getQueueSize());
+
+        try {
+            template.sendBody(uri, "Message 10");
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            // queue should be full
+            assertIsInstanceOf(IllegalStateException.class, e.getCause());
+        }
+
+        Exchange out = consumer.receive(5000);
+        assertNotNull(out);
+        assertEquals("Message 0", out.getIn().getBody());
+
+        assertEquals(9, edpc.getQueueSize());
+        assertEquals(10, edpc.getQueueCapacity());
+
+        // now there is room
+        template.sendBody(uri, "Message 10");
+
+        assertEquals(10, edpc.getQueueSize());
+        assertEquals(10, edpc.getQueueCapacity());
+
+        ServiceHelper.stopService(consumer);
+        // not cleared if we stop
+        assertEquals(10, edpc.getQueueSize());
+        assertEquals(10, edpc.getQueueCapacity());
+
+        ServiceHelper.stopAndShutdownService(consumer);
+        // now its cleared as we shutdown
+        assertEquals(0, edpc.getQueueSize());
+        assertEquals(10, edpc.getQueueCapacity());
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    private final  class MyQueueComponent extends DefaultComponent {
+
+        @Override
+        protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+            return new MyQueueEndpoint(uri, this);
+        }
+    }
+
+    private final class MyQueueEndpoint extends DefaultEndpoint {
+
+        private EventDrivenPollingConsumer consumer;
+
+        private MyQueueEndpoint(String endpointUri, Component component) {
+            super(endpointUri, component);
+        }
+
+        @Override
+        public Producer createProducer() throws Exception {
+            return new DefaultProducer(this) {
+                @Override
+                public void process(Exchange exchange) throws Exception {
+                    consumer.process(exchange);
+                }
+            };
+        }
+
+        @Override
+        public Consumer createConsumer(Processor processor) throws Exception {
+            return consumer;
+        }
+
+        @Override
+        public PollingConsumer createPollingConsumer() throws Exception {
+            return consumer;
+        }
+
+        @Override
+        public boolean isSingleton() {
+            return true;
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            consumer = (EventDrivenPollingConsumer) super.createPollingConsumer();
+            super.doStart();
+        }
+    }
+}