You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/02/14 12:53:54 UTC
[camel] 09/23: CAMEL-15105: rework handling the unit of work
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 57d5f57077add6e71884e26e08c1fea9528d3bd7
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Nov 23 09:11:11 2022 +0100
CAMEL-15105: rework handling the unit of work
---
.../src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java | 3 +--
.../org/apache/camel/component/jetty/CamelContinuationServlet.java | 4 +---
.../microprofile/faulttolerance/FaultToleranceProcessor.java | 2 +-
.../apache/camel/component/resilience4j/ResilienceProcessor.java | 2 +-
.../java/org/apache/camel/impl/engine/CamelInternalProcessor.java | 6 ++----
.../main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java | 3 +--
.../org/apache/camel/converter/stream/CachedOutputStreamTest.java | 3 +--
.../java/org/apache/camel/support/EventDrivenPollingConsumer.java | 2 +-
8 files changed, 9 insertions(+), 16 deletions(-)
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 5c909c56021..ba7f5dfc9b2 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -30,7 +30,6 @@ import javax.security.auth.login.Configuration;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
-import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
@@ -251,7 +250,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
protected void updateNewExchange(Exchange exchange, int index, HdfsInputStream hdfsFile) {
// do not share unit of work
- exchange.adapt(ExtendedExchange.class).setUnitOfWork(null);
+ exchange.getExchangeExtension().setUnitOfWork(null);
exchange.setProperty(ExchangePropertyKey.SPLIT_INDEX, index);
diff --git a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
index c4fece46b23..82f1f428de6 100644
--- a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
+++ b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
@@ -33,7 +33,6 @@ import jakarta.servlet.http.HttpServletResponse;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.http.common.CamelServlet;
import org.apache.camel.http.common.HttpCommonEndpoint;
@@ -239,8 +238,7 @@ public class CamelContinuationServlet extends CamelServlet {
}
} else if (uow.onPrepare(exchange)) {
// need to re-attach uow
- ExtendedExchange ee = (ExtendedExchange) exchange;
- ee.setUnitOfWork(uow);
+ exchange.getExchangeExtension().setUnitOfWork(uow);
}
ClassLoader oldTccl = overrideTccl(exchange);
diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index 8ccaca4bab2..6c148e068ea 100644
--- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -445,7 +445,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
} else {
// prepare uow on copy
uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
- copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+ copy.getExchangeExtension().setUnitOfWork(uow);
// the copy must be starting from the route where its copied from
Route route = ExchangeHelper.getRoute(exchange);
if (route != null) {
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 04d755d1e07..83234a6b165 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -526,7 +526,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
} else {
// prepare uow on copy
uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
- copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+ copy.getExchangeExtension().setUnitOfWork(uow);
// the copy must be starting from the route where its copied from
Route route = ExchangeHelper.getRoute(exchange);
if (route != null) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 7f9630868bf..ff2e6c51067 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -782,15 +782,13 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
// If there is no existing UoW, then we should start one and
// terminate it once processing is completed for the exchange.
created = createUnitOfWork(exchange);
- ExtendedExchange ee = (ExtendedExchange) exchange;
- ee.setUnitOfWork(created);
+ exchange.getExchangeExtension().setUnitOfWork(created);
uow = created;
} else {
// reuse existing exchange
if (uow.onPrepare(exchange)) {
// need to re-attach uow
- ExtendedExchange ee = (ExtendedExchange) exchange;
- ee.setUnitOfWork(uow);
+ exchange.getExchangeExtension().setUnitOfWork(uow);
// we are prepared for reuse and can regard it as-if we created the unit of work
// so the after method knows that this is the outer bounds and should done the unit of work
created = uow;
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 5ddfd782329..3c6cb7af9d6 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -275,8 +275,7 @@ public class DefaultUnitOfWork implements UnitOfWork {
protected void onDone() {
// MUST clear and set uow to null on exchange after done
// in case the same exchange is manually reused by Camel end users (should happen seldom)
- ExtendedExchange ee = (ExtendedExchange) exchange;
- ee.setUnitOfWork(null);
+ exchange.getExchangeExtension().setUnitOfWork(null);
}
@Override
diff --git a/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java b/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
index 916d9c49e46..0b13f65e153 100644
--- a/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
@@ -28,7 +28,6 @@ import java.util.StringJoiner;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedExchange;
import org.apache.camel.StreamCache;
import org.apache.camel.converter.IOConverter;
import org.apache.camel.impl.engine.DefaultUnitOfWork;
@@ -66,7 +65,7 @@ public class CachedOutputStreamTest extends ContextTestSupport {
exchange = new DefaultExchange(context);
UnitOfWork uow = new DefaultUnitOfWork(exchange);
- exchange.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+ exchange.getExchangeExtension().setUnitOfWork(uow);
}
@Override
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
index 620b98fbe52..55ab8afbb6e 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
@@ -213,7 +213,7 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
// we want the copy to have an uow
UnitOfWork uow = getEndpoint().getCamelContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory()
.createUnitOfWork(copy);
- copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+ copy.getExchangeExtension().setUnitOfWork(uow);
return copy;
}