You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/06/20 16:33:10 UTC
[camel] 01/02: CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6a95f58c5c43d83492c0ef2ea42db880f5308a04
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jun 20 16:27:16 2022 +0200
CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently
---
.../main/java/org/apache/camel/PooledExchange.java | 2 +-
.../camel/impl/engine/DefaultUnitOfWork.java | 2 +-
.../camel/impl/engine/PooledExchangeFactory.java | 24 +++++++++++---------
.../engine/PooledProcessorExchangeFactory.java | 26 +++++++++++++---------
.../org/apache/camel/support/DefaultConsumer.java | 9 ++++++--
.../camel/support/DefaultPooledExchange.java | 6 ++---
6 files changed, 40 insertions(+), 29 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java b/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java
index 764ed2b2ad8..0c471a78a51 100644
--- a/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java
@@ -45,7 +45,7 @@ public interface PooledExchange extends ExtendedExchange {
* <p/>
* <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
*/
- void done(boolean forced);
+ void done();
/**
* Resets the exchange for reuse with the given created timestamp;
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 80b08c3634d..36821bbfc1a 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -259,7 +259,7 @@ public class DefaultUnitOfWork implements UnitOfWork {
// pooled exchange has its own done logic which will reset this uow for reuse
// so do not call onDone
try {
- ((PooledExchange) exchange).done(false);
+ ((PooledExchange) exchange).done();
} catch (Throwable e) {
// must catch exceptions to ensure synchronizations is also invoked
log.warn("Exception occurred during exchange done. This exception will be ignored.", e);
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 1f9995617d0..a02ca09b438 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -44,7 +44,7 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory {
@Override
protected void doBuild() throws Exception {
super.doBuild();
- // force to create and load the class during build time so the JVM does not
+ // force creating and load the class during build time so the JVM does not
// load the class on first exchange to be created
DefaultPooledExchange dummy = new DefaultPooledExchange(camelContext);
// force message init to load classes
@@ -75,10 +75,12 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory {
if (statisticsEnabled) {
statistics.acquired.increment();
}
- // reset exchange for reuse
- PooledExchange ee = (PooledExchange) exchange;
- ee.reset(System.currentTimeMillis());
}
+
+ // reset exchange for reuse
+ PooledExchange ee = (PooledExchange) exchange;
+ ee.reset(System.currentTimeMillis());
+
return exchange;
}
@@ -95,21 +97,21 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory {
if (statisticsEnabled) {
statistics.acquired.increment();
}
- // reset exchange for reuse
- PooledExchange ee = (PooledExchange) exchange;
- ee.reset(System.currentTimeMillis());
}
+
+ // reset exchange for reuse
+ PooledExchange ee = (PooledExchange) exchange;
+ ee.reset(System.currentTimeMillis());
+
return exchange;
}
@Override
public boolean release(Exchange exchange) {
try {
- // done exchange before returning back to pool
+ // done exchange before returning to pool
PooledExchange ee = (PooledExchange) exchange;
- boolean force = !ee.isAutoRelease();
- ee.done(force);
- ee.onDone(null);
+ ee.done();
// only release back in pool if reset was success
boolean inserted = pool.offer(exchange);
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
index ed4620d9e6d..48dcc697cbf 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
@@ -70,11 +70,12 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
if (statisticsEnabled) {
statistics.acquired.increment();
}
- // reset exchange for reuse
- PooledExchange ee = (PooledExchange) answer;
- ee.reset(System.currentTimeMillis());
}
+ // reset exchange for reuse
+ PooledExchange ee = (PooledExchange) answer;
+ ee.reset(System.currentTimeMillis());
+
ExchangeHelper.copyResults(answer, exchange);
return answer;
}
@@ -95,11 +96,12 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
if (statisticsEnabled) {
statistics.acquired.increment();
}
- // reset exchange for reuse
- PooledExchange ee = (PooledExchange) answer;
- ee.reset(System.currentTimeMillis());
}
+ // reset exchange for reuse
+ PooledExchange ee = (PooledExchange) answer;
+ ee.reset(System.currentTimeMillis());
+
ExchangeHelper.copyResults(answer, exchange);
// do not reuse message id on copy
answer.getIn().setMessageId(null);
@@ -125,19 +127,21 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
if (statisticsEnabled) {
statistics.acquired.increment();
}
- // reset exchange for reuse
- PooledExchange ee = (PooledExchange) answer;
- ee.reset(System.currentTimeMillis());
}
+
+ // reset exchange for reuse
+ PooledExchange ee = (PooledExchange) answer;
+ ee.reset(System.currentTimeMillis());
+
return answer;
}
@Override
public boolean release(Exchange exchange) {
try {
- // done exchange before returning back to pool
+ // done exchange before returning to pool
PooledExchange ee = (PooledExchange) exchange;
- ee.done(true);
+ ee.done();
// only release back in pool if reset was success
boolean inserted = pool.offer(exchange);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 8bbf6e3aefc..d3d97b3fa8a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -144,7 +144,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
if (exchange != null) {
if (!autoRelease && exchange instanceof PooledExchange) {
// if not auto release we must manually force done
- ((PooledExchange) exchange).done(true);
+ ((PooledExchange) exchange).done();
}
exchangeFactory.release(exchange);
}
@@ -263,11 +263,13 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
private final DefaultConsumer consumer;
private final Exchange exchange;
+ private final boolean pooled;
private final boolean autoRelease;
public DefaultConsumerCallback(DefaultConsumer consumer, Exchange exchange, boolean autoRelease) {
this.consumer = consumer;
this.exchange = exchange;
+ this.pooled = exchange instanceof PooledExchange;
this.autoRelease = autoRelease;
}
@@ -280,7 +282,10 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
exchange.getException());
}
} finally {
- consumer.releaseExchange(exchange, autoRelease);
+ if (!autoRelease) {
+ // must release if not auto released
+ consumer.releaseExchange(exchange, autoRelease);
+ }
}
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index a8aa4574b67..41b53eb0e83 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -31,7 +31,7 @@ import org.apache.camel.PooledExchange;
public final class DefaultPooledExchange extends AbstractExchange implements PooledExchange {
private OnDoneTask onDone;
- private Class originalInClassType;
+ private Class<?> originalInClassType;
private Message originalOut;
private final ExchangePattern originalPattern;
private boolean autoRelease;
@@ -79,8 +79,8 @@ public final class DefaultPooledExchange extends AbstractExchange implements Poo
this.onDone = task;
}
- public void done(boolean forced) {
- if (created > 0 && (forced || autoRelease)) {
+ public void done() {
+ if (created > 0) {
this.created = 0; // by setting to 0 we also flag that this exchange is done and needs to be reset to use again
this.properties.clear();
// reset array by copying over from empty which is a very fast JVM optimized operation