You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/12 13:12:19 UTC

[camel] branch master updated (7b1f358 -> 6cdee84)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 7b1f358  CAMEL-16342 Upgrade common-compression to 1.20 and fix camel-tarfile accordingly (#5198)
     new 5d8f66f  CAMEL-16331: camel-core - DefaultConsumer used as PolllingConsumer EIP should not be pooled exchanges
     new 6cdee84  CAMEL-16331: camel-core - DefaultConsumer used as PolllingConsumer EIP should not be pooled exchanges. Also fix little bug in ExchangeHelper copy which did not handover onCompletions if UoW was null on target.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java     | 11 -----------
 .../apache/camel/component/paho/PahoToDSendDynamicTest.java   | 11 -----------
 .../main/java/org/apache/camel/processor/PollEnricher.java    |  6 +++++-
 .../file/FileConsumePollEnrichFileUsingProcessorTest.java     |  3 +++
 .../org/apache/camel/support/EventDrivenPollingConsumer.java  |  7 ++++++-
 .../main/java/org/apache/camel/support/ExchangeHelper.java    |  9 +++------
 6 files changed, 17 insertions(+), 30 deletions(-)


[camel] 01/02: CAMEL-16331: camel-core - DefaultConsumer used as PolllingConsumer EIP should not be pooled exchanges

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5d8f66f02f9dd5356069777c3403023775306cd9
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Mar 12 13:12:06 2021 +0100

    CAMEL-16331: camel-core - DefaultConsumer used as PolllingConsumer EIP should not be pooled exchanges
---
 .../component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java     | 11 -----------
 .../apache/camel/component/paho/PahoToDSendDynamicTest.java   | 11 -----------
 .../org/apache/camel/support/EventDrivenPollingConsumer.java  |  7 ++++++-
 3 files changed, 6 insertions(+), 23 deletions(-)

diff --git a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java
index cea8d54..1c13670 100644
--- a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java
+++ b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5ToDSendDynamicTest.java
@@ -16,24 +16,13 @@
  */
 package org.apache.camel.component.paho.mqtt5;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.engine.PrototypeExchangeFactory;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class PahoMqtt5ToDSendDynamicTest extends PahoMqtt5TestSupport {
 
-    @Override
-    protected CamelContext createCamelContext() throws Exception {
-        CamelContext context = super.createCamelContext();
-        // this test must use prototype scope as we use pooling consumer
-        context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PrototypeExchangeFactory());
-        return context;
-    }
-
     @Test
     public void testToD() throws Exception {
         template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar");
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
index 0d2d662..4deb2c7 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java
@@ -17,10 +17,7 @@
 package org.apache.camel.component.paho;
 
 import org.apache.activemq.broker.BrokerService;
-import org.apache.camel.CamelContext;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.engine.PrototypeExchangeFactory;
 import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.AfterEach;
@@ -35,14 +32,6 @@ public class PahoToDSendDynamicTest extends CamelTestSupport {
     int mqttPort = AvailablePortFinder.getNextAvailable();
 
     @Override
-    protected CamelContext createCamelContext() throws Exception {
-        CamelContext context = super.createCamelContext();
-        // this test must use prototype scope as we use pooling consumer
-        context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PrototypeExchangeFactory());
-        return context;
-    }
-
-    @Override
     protected boolean useJmx() {
         return false;
     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
index 011b35d..bd15b3b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
@@ -30,6 +30,7 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.IsSingleton;
 import org.apache.camel.PollingConsumerPollingStrategy;
+import org.apache.camel.PooledExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.UnitOfWork;
@@ -174,7 +175,11 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        if (isCopy()) {
+        // we must make a copy as the exchange put on queue is consumed by another part
+        // and it would not reset and return the pooled exchange to the pool
+        boolean pooled = exchange instanceof PooledExchange;
+
+        if (isCopy() || pooled) {
             // if we copy then we handover completion
             exchange = prepareCopy(exchange, true);
         }


[camel] 02/02: CAMEL-16331: camel-core - DefaultConsumer used as PolllingConsumer EIP should not be pooled exchanges. Also fix little bug in ExchangeHelper copy which did not handover onCompletions if UoW was null on target.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6cdee841318bf88f7a389871e95bb1ba25a23323
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Mar 12 14:06:08 2021 +0100

    CAMEL-16331: camel-core - DefaultConsumer used as PolllingConsumer EIP should not be pooled exchanges. Also fix little bug in ExchangeHelper copy which did not handover onCompletions if UoW was null on target.
---
 .../src/main/java/org/apache/camel/processor/PollEnricher.java   | 6 +++++-
 .../file/FileConsumePollEnrichFileUsingProcessorTest.java        | 3 +++
 .../src/main/java/org/apache/camel/support/ExchangeHelper.java   | 9 +++------
 3 files changed, 11 insertions(+), 7 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index 1a03637..e9ea51e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -438,7 +438,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
     }
 
     @Override
-    protected void doStart() throws Exception {
+    protected void doBuild() throws Exception {
         if (consumerCache == null) {
             // create consumer cache if we use dynamic expressions for computing the endpoints to poll
             consumerCache = new DefaultConsumerCache(this, camelContext, cacheSize);
@@ -447,6 +447,10 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
         if (aggregationStrategy instanceof CamelContextAware) {
             ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
         }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
         ServiceHelper.startService(consumerCache, aggregationStrategy);
     }
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java
index 305f7b3..6c66cf1 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java
@@ -86,6 +86,9 @@ public class FileConsumePollEnrichFileUsingProcessorTest extends ContextTestSupp
                             // otherwise do a rollback
                             throw new CamelExchangeException("Cannot find the data file " + name, exchange);
                         }
+
+                        // and remember to done the UoW
+                        data.getUnitOfWork().done(data);
                     }
                 }).to("mock:start");
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 4a3d45d..42d8408 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -308,12 +308,9 @@ public final class ExchangeHelper {
         // do not share the unit of work
         ExtendedExchange ce = (ExtendedExchange) copy;
         ce.setUnitOfWork(null);
-
-        // do not reuse the message id
-        // hand over on completion to the copy if we got any
-        UnitOfWork uow = exchange.getUnitOfWork();
-        if (handover && uow != null) {
-            uow.handoverSynchronization(copy, filter);
+        if (handover) {
+            // Need to hand over the completion for async invocation
+            exchange.adapt(ExtendedExchange.class).handoverCompletions(ce);
         }
         // set a correlation id so we can track back the original exchange
         copy.setProperty(ExchangePropertyKey.CORRELATION_ID, id);