You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/12 13:12:19 UTC
[camel] branch master updated (7b1f358 -> 6cdee84)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.
from 7b1f358 CAMEL-16342 Upgrade common-compression to 1.20 and fix camel-tarfile accordingly (#5198)
new 5d8f66f CAMEL-16331: camel-core - DefaultConsumer used as PolllingConsumer EIP should not be pooled exchanges
new 6cdee84 CAMEL-16331: camel-core - DefaultConsumer used as PolllingConsumer EIP should not be pooled exchanges. Also fix little bug in ExchangeHelper copy which did not handover onCompletions if UoW was null on target.
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java | 11 -----------
.../apache/camel/component/paho/PahoToDSendDynamicTest.java | 11 -----------
.../main/java/org/apache/camel/processor/PollEnricher.java | 6 +++++-
.../file/FileConsumePollEnrichFileUsingProcessorTest.java | 3 +++
.../org/apache/camel/support/EventDrivenPollingConsumer.java | 7 ++++++-
.../main/java/org/apache/camel/support/ExchangeHelper.java | 9 +++------
6 files changed, 17 insertions(+), 30 deletions(-)
[camel] 01/02: CAMEL-16331: camel-core - DefaultConsumer used as
PolllingConsumer EIP should not be pooled exchanges
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5d8f66f02f9dd5356069777c3403023775306cd9
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Mar 12 13:12:06 2021 +0100
CAMEL-16331: camel-core - DefaultConsumer used as PolllingConsumer EIP should not be pooled exchanges
---
.../component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java | 11 -----------
.../apache/camel/component/paho/PahoToDSendDynamicTest.java | 11 -----------
.../org/apache/camel/support/EventDrivenPollingConsumer.java | 7 ++++++-
3 files changed, 6 insertions(+), 23 deletions(-)
diff --git a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java
index cea8d54..1c13670 100644
--- a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java
+++ b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java
@@ -16,24 +16,13 @@
*/
package org.apache.camel.component.paho.mqtt5;
-import org.apache.camel.CamelContext;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.engine.PrototypeExchangeFactory;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class PahoMqtt5ToDSendDynamicTest extends PahoMqtt5TestSupport {
- @Override
- protected CamelContext createCamelContext() throws Exception {
- CamelContext context = super.createCamelContext();
- // this test must use prototype scope as we use pooling consumer
- context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PrototypeExchangeFactory());
- return context;
- }
-
@Test
public void testToD() throws Exception {
template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar");
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
index 0d2d662..4deb2c7 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
@@ -17,10 +17,7 @@
package org.apache.camel.component.paho;
import org.apache.activemq.broker.BrokerService;
-import org.apache.camel.CamelContext;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.engine.PrototypeExchangeFactory;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.AfterEach;
@@ -35,14 +32,6 @@ public class PahoToDSendDynamicTest extends CamelTestSupport {
int mqttPort = AvailablePortFinder.getNextAvailable();
@Override
- protected CamelContext createCamelContext() throws Exception {
- CamelContext context = super.createCamelContext();
- // this test must use prototype scope as we use pooling consumer
- context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PrototypeExchangeFactory());
- return context;
- }
-
- @Override
protected boolean useJmx() {
return false;
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
index 011b35d..bd15b3b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
@@ -30,6 +30,7 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.IsSingleton;
import org.apache.camel.PollingConsumerPollingStrategy;
+import org.apache.camel.PooledExchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.UnitOfWork;
@@ -174,7 +175,11 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
@Override
public void process(Exchange exchange) throws Exception {
- if (isCopy()) {
+ // we must make a copy as the exchange put on queue is consumed by another part
+ // and it would not reset and return the pooled exchange to the pool
+ boolean pooled = exchange instanceof PooledExchange;
+
+ if (isCopy() || pooled) {
// if we copy then we handover completion
exchange = prepareCopy(exchange, true);
}
[camel] 02/02: CAMEL-16331: camel-core - DefaultConsumer used as
PolllingConsumer EIP should not be pooled exchanges. Also fix little bug in
ExchangeHelper copy which did not handover onCompletions if UoW was null on
target.
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6cdee841318bf88f7a389871e95bb1ba25a23323
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Mar 12 14:06:08 2021 +0100
CAMEL-16331: camel-core - DefaultConsumer used as PolllingConsumer EIP should not be pooled exchanges. Also fix little bug in ExchangeHelper copy which did not handover onCompletions if UoW was null on target.
---
.../src/main/java/org/apache/camel/processor/PollEnricher.java | 6 +++++-
.../file/FileConsumePollEnrichFileUsingProcessorTest.java | 3 +++
.../src/main/java/org/apache/camel/support/ExchangeHelper.java | 9 +++------
3 files changed, 11 insertions(+), 7 deletions(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index 1a03637..e9ea51e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -438,7 +438,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
}
@Override
- protected void doStart() throws Exception {
+ protected void doBuild() throws Exception {
if (consumerCache == null) {
// create consumer cache if we use dynamic expressions for computing the endpoints to poll
consumerCache = new DefaultConsumerCache(this, camelContext, cacheSize);
@@ -447,6 +447,10 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
if (aggregationStrategy instanceof CamelContextAware) {
((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
}
+ }
+
+ @Override
+ protected void doStart() throws Exception {
ServiceHelper.startService(consumerCache, aggregationStrategy);
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java
index 305f7b3..6c66cf1 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java
@@ -86,6 +86,9 @@ public class FileConsumePollEnrichFileUsingProcessorTest extends ContextTestSupp
// otherwise do a rollback
throw new CamelExchangeException("Cannot find the data file " + name, exchange);
}
+
+ // and remember to done the UoW
+ data.getUnitOfWork().done(data);
}
}).to("mock:start");
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 4a3d45d..42d8408 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -308,12 +308,9 @@ public final class ExchangeHelper {
// do not share the unit of work
ExtendedExchange ce = (ExtendedExchange) copy;
ce.setUnitOfWork(null);
-
- // do not reuse the message id
- // hand over on completion to the copy if we got any
- UnitOfWork uow = exchange.getUnitOfWork();
- if (handover && uow != null) {
- uow.handoverSynchronization(copy, filter);
+ if (handover) {
+ // Need to hand over the completion for async invocation
+ exchange.adapt(ExtendedExchange.class).handoverCompletions(ce);
}
// set a correlation id so we can track back the original exchange
copy.setProperty(ExchangePropertyKey.CORRELATION_ID, id);