You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/05/06 09:56:12 UTC
git commit: CAMEL-7411: EventDrivenPollingConsumer can lose exchanges
when the internal queue is full
Repository: camel
Updated Branches:
refs/heads/camel-2.13.x 46e52e753 -> a032bc143
CAMEL-7411: EventDrivenPollingConsumer can lose exchanges when the internal queue is full
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a032bc14
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a032bc14
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a032bc14
Branch: refs/heads/camel-2.13.x
Commit: a032bc14305ab511aa286012c9c5773f6031a738
Parents: 46e52e7
Author: Claus Ibsen <da...@apache.org>
Authored: Tue May 6 09:55:38 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue May 6 09:55:58 2014 +0200
----------------------------------------------------------------------
.../org/apache/camel/impl/DefaultEndpoint.java | 62 +++++++-
.../camel/impl/EventDrivenPollingConsumer.java | 51 ++++++-
...EventDrivenPollingConsumerQueueSizeTest.java | 142 +++++++++++++++++++
3 files changed, 251 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a032bc14/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
index badd48e..2a6c5b0 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
@@ -36,6 +36,8 @@ import org.apache.camel.util.EndpointHelper;
import org.apache.camel.util.IntrospectionSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A default endpoint useful for implementation inheritance.
@@ -52,6 +54,7 @@ import org.apache.camel.util.URISupport;
*/
public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint, HasId, CamelContextAware {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultEndpoint.class);
private String endpointUri;
private EndpointConfiguration endpointConfiguration;
private CamelContext camelContext;
@@ -64,6 +67,8 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
private boolean synchronous;
private final String id = EndpointHelper.createEndpointId();
private Map<String, Object> consumerProperties;
+ private int pollingConsumerQueueSize = 1000;
+ private boolean pollingConsumerBlockWhenFull = true;
/**
* Constructs a fully-initialized DefaultEndpoint instance. This is the
@@ -215,8 +220,11 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
}
public PollingConsumer createPollingConsumer() throws Exception {
- // should not configure consumer
- return new EventDrivenPollingConsumer(this);
+ // should not call configurePollingConsumer when its EventDrivenPollingConsumer
+ LOG.debug("Creating EventDrivenPollingConsumer with queueSize: {} and blockWhenFull: {}", getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull());
+ EventDrivenPollingConsumer consumer = new EventDrivenPollingConsumer(this, getPollingConsumerQueueSize());
+ consumer.setBlockWhenFull(isPollingConsumerBlockWhenFull());
+ return consumer;
}
public Exchange createExchange(Exchange exchange) {
@@ -267,6 +275,56 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint
this.synchronous = synchronous;
}
+ /**
+ * Gets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+ * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
+ * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
+ * <p/>
+ * The default value is <tt>1000</tt>
+ */
+ public int getPollingConsumerQueueSize() {
+ return pollingConsumerQueueSize;
+ }
+
+ /**
+ * Sets the {@link org.apache.camel.PollingConsumer} queue size, when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+ * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
+ * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
+ * <p/>
+ * The default value is <tt>1000</tt>
+ */
+ public void setPollingConsumerQueueSize(int pollingConsumerQueueSize) {
+ this.pollingConsumerQueueSize = pollingConsumerQueueSize;
+ }
+
+ /**
+ * Whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+ * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
+ * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
+ * <p/>
+ * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown
+ * when trying to add to the queue, and its full.
+ * <p/>
+ * The default value is <tt>true</tt> which will block the producer queue until the queue has space.
+ */
+ public boolean isPollingConsumerBlockWhenFull() {
+ return pollingConsumerBlockWhenFull;
+ }
+
+ /**
+ * Set whether to block when adding to the internal queue off when {@link org.apache.camel.impl.EventDrivenPollingConsumer}
+ * is being used. Notice some Camel components may have their own implementation of {@link org.apache.camel.PollingConsumer} and
+ * therefore not using the default {@link org.apache.camel.impl.EventDrivenPollingConsumer} implementation.
+ * <p/>
+ * Setting this option to <tt>false</tt>, will result in an {@link java.lang.IllegalStateException} being thrown
+ * when trying to add to the queue, and its full.
+ * <p/>
+ * The default value is <tt>true</tt> which will block the producer queue until the queue has space.
+ */
+ public void setPollingConsumerBlockWhenFull(boolean pollingConsumerBlockWhenFull) {
+ this.pollingConsumerBlockWhenFull = pollingConsumerBlockWhenFull;
+ }
+
public void configureProperties(Map<String, Object> options) {
Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options, "consumer.");
if (consumerProperties != null && !consumerProperties.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/a032bc14/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index 07bcf07..428610e 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.impl;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -43,17 +44,53 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
private final BlockingQueue<Exchange> queue;
private ExceptionHandler interruptedExceptionHandler;
private Consumer consumer;
+ private boolean blockWhenFull = true;
+ private final int queueCapacity;
public EventDrivenPollingConsumer(Endpoint endpoint) {
- this(endpoint, new ArrayBlockingQueue<Exchange>(1000));
+ this(endpoint, 1000);
+ }
+
+ public EventDrivenPollingConsumer(Endpoint endpoint, int queueSize) {
+ super(endpoint);
+ this.queueCapacity = queueSize;
+ if (queueSize <= 0) {
+ this.queue = new LinkedBlockingDeque<Exchange>();
+ } else {
+ this.queue = new ArrayBlockingQueue<Exchange>(queueSize);
+ }
+ this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
}
public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue) {
super(endpoint);
this.queue = queue;
+ this.queueCapacity = queue.remainingCapacity();
this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
}
+ public boolean isBlockWhenFull() {
+ return blockWhenFull;
+ }
+
+ public void setBlockWhenFull(boolean blockWhenFull) {
+ this.blockWhenFull = blockWhenFull;
+ }
+
+ /**
+ * Gets the queue capacity.
+ */
+ public int getQueueCapacity() {
+ return queueCapacity;
+ }
+
+ /**
+ * Gets the current queue size (no of elements in the queue).
+ */
+ public int getQueueSize() {
+ return queue.size();
+ }
+
public Exchange receiveNoWait() {
return receive(0);
}
@@ -98,7 +135,16 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
}
public void process(Exchange exchange) throws Exception {
- queue.offer(exchange);
+ if (isBlockWhenFull()) {
+ try {
+ queue.put(exchange);
+ } catch (InterruptedException e) {
+ // ignore
+ log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped());
+ }
+ } else {
+ queue.add(exchange);
+ }
}
public ExceptionHandler getInterruptedExceptionHandler() {
@@ -155,5 +201,6 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownService(consumer);
+ queue.clear();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a032bc14/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java b/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java
new file mode 100644
index 0000000..06649a2
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/EventDrivenPollingConsumerQueueSizeTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Map;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+
+public class EventDrivenPollingConsumerQueueSizeTest extends ContextTestSupport {
+
+ private String uri = "my:foo?pollingConsumerQueueSize=10&pollingConsumerBlockWhenFull=false";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ context.addComponent("my", new MyQueueComponent());
+ }
+
+ public void testQueueSize() throws Exception {
+ PollingConsumer consumer = context.getEndpoint(uri).createPollingConsumer();
+ consumer.start();
+
+ assertNotNull(consumer);
+ EventDrivenPollingConsumer edpc = assertIsInstanceOf(EventDrivenPollingConsumer.class, consumer);
+ assertEquals(0, edpc.getQueueSize());
+ assertEquals(10, edpc.getQueueCapacity());
+ assertFalse(edpc.isBlockWhenFull());
+
+ for (int i = 0; i < 10; i++) {
+ template.sendBody(uri, "Message " + i);
+ }
+
+ assertEquals(10, edpc.getQueueSize());
+
+ try {
+ template.sendBody(uri, "Message 10");
+ fail("Should have thrown exception");
+ } catch (CamelExecutionException e) {
+ // queue should be full
+ assertIsInstanceOf(IllegalStateException.class, e.getCause());
+ }
+
+ Exchange out = consumer.receive(5000);
+ assertNotNull(out);
+ assertEquals("Message 0", out.getIn().getBody());
+
+ assertEquals(9, edpc.getQueueSize());
+ assertEquals(10, edpc.getQueueCapacity());
+
+ // now there is room
+ template.sendBody(uri, "Message 10");
+
+ assertEquals(10, edpc.getQueueSize());
+ assertEquals(10, edpc.getQueueCapacity());
+
+ ServiceHelper.stopService(consumer);
+ // not cleared if we stop
+ assertEquals(10, edpc.getQueueSize());
+ assertEquals(10, edpc.getQueueCapacity());
+
+ ServiceHelper.stopAndShutdownService(consumer);
+ // now its cleared as we shutdown
+ assertEquals(0, edpc.getQueueSize());
+ assertEquals(10, edpc.getQueueCapacity());
+ }
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ private final class MyQueueComponent extends DefaultComponent {
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ return new MyQueueEndpoint(uri, this);
+ }
+ }
+
+ private final class MyQueueEndpoint extends DefaultEndpoint {
+
+ private EventDrivenPollingConsumer consumer;
+
+ private MyQueueEndpoint(String endpointUri, Component component) {
+ super(endpointUri, component);
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new DefaultProducer(this) {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ consumer.process(exchange);
+ }
+ };
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return consumer;
+ }
+
+ @Override
+ public PollingConsumer createPollingConsumer() throws Exception {
+ return consumer;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ consumer = (EventDrivenPollingConsumer) super.createPollingConsumer();
+ super.doStart();
+ }
+ }
+}