You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/02/14 12:53:54 UTC

[camel] 09/23: CAMEL-15105: rework handling the unit of work

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 57d5f57077add6e71884e26e08c1fea9528d3bd7
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Nov 23 09:11:11 2022 +0100

    CAMEL-15105: rework handling the unit of work
---
 .../src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java | 3 +--
 .../org/apache/camel/component/jetty/CamelContinuationServlet.java  | 4 +---
 .../microprofile/faulttolerance/FaultToleranceProcessor.java        | 2 +-
 .../apache/camel/component/resilience4j/ResilienceProcessor.java    | 2 +-
 .../java/org/apache/camel/impl/engine/CamelInternalProcessor.java   | 6 ++----
 .../main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java   | 3 +--
 .../org/apache/camel/converter/stream/CachedOutputStreamTest.java   | 3 +--
 .../java/org/apache/camel/support/EventDrivenPollingConsumer.java   | 2 +-
 8 files changed, 9 insertions(+), 16 deletions(-)

diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 5c909c56021..ba7f5dfc9b2 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -30,7 +30,6 @@ import javax.security.auth.login.Configuration;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
@@ -251,7 +250,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
 
     protected void updateNewExchange(Exchange exchange, int index, HdfsInputStream hdfsFile) {
         // do not share unit of work
-        exchange.adapt(ExtendedExchange.class).setUnitOfWork(null);
+        exchange.getExchangeExtension().setUnitOfWork(null);
 
         exchange.setProperty(ExchangePropertyKey.SPLIT_INDEX, index);
 
diff --git a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
index c4fece46b23..82f1f428de6 100644
--- a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
+++ b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
@@ -33,7 +33,6 @@ import jakarta.servlet.http.HttpServletResponse;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.http.common.CamelServlet;
 import org.apache.camel.http.common.HttpCommonEndpoint;
@@ -239,8 +238,7 @@ public class CamelContinuationServlet extends CamelServlet {
                 }
             } else if (uow.onPrepare(exchange)) {
                 // need to re-attach uow
-                ExtendedExchange ee = (ExtendedExchange) exchange;
-                ee.setUnitOfWork(uow);
+                exchange.getExchangeExtension().setUnitOfWork(uow);
             }
 
             ClassLoader oldTccl = overrideTccl(exchange);
diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index 8ccaca4bab2..6c148e068ea 100644
--- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -445,7 +445,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
                 } else {
                     // prepare uow on copy
                     uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
-                    copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+                    copy.getExchangeExtension().setUnitOfWork(uow);
                     // the copy must be starting from the route where its copied from
                     Route route = ExchangeHelper.getRoute(exchange);
                     if (route != null) {
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 04d755d1e07..83234a6b165 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -526,7 +526,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport
             } else {
                 // prepare uow on copy
                 uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
-                copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+                copy.getExchangeExtension().setUnitOfWork(uow);
                 // the copy must be starting from the route where its copied from
                 Route route = ExchangeHelper.getRoute(exchange);
                 if (route != null) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 7f9630868bf..ff2e6c51067 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -782,15 +782,13 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
                 // If there is no existing UoW, then we should start one and
                 // terminate it once processing is completed for the exchange.
                 created = createUnitOfWork(exchange);
-                ExtendedExchange ee = (ExtendedExchange) exchange;
-                ee.setUnitOfWork(created);
+                exchange.getExchangeExtension().setUnitOfWork(created);
                 uow = created;
             } else {
                 // reuse existing exchange
                 if (uow.onPrepare(exchange)) {
                     // need to re-attach uow
-                    ExtendedExchange ee = (ExtendedExchange) exchange;
-                    ee.setUnitOfWork(uow);
+                    exchange.getExchangeExtension().setUnitOfWork(uow);
                     // we are prepared for reuse and can regard it as-if we created the unit of work
                     // so the after method knows that this is the outer bounds and should done the unit of work
                     created = uow;
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 5ddfd782329..3c6cb7af9d6 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -275,8 +275,7 @@ public class DefaultUnitOfWork implements UnitOfWork {
     protected void onDone() {
         // MUST clear and set uow to null on exchange after done
         // in case the same exchange is manually reused by Camel end users (should happen seldom)
-        ExtendedExchange ee = (ExtendedExchange) exchange;
-        ee.setUnitOfWork(null);
+        exchange.getExchangeExtension().setUnitOfWork(null);
     }
 
     @Override
diff --git a/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java b/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
index 916d9c49e46..0b13f65e153 100644
--- a/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
@@ -28,7 +28,6 @@ import java.util.StringJoiner;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.StreamCache;
 import org.apache.camel.converter.IOConverter;
 import org.apache.camel.impl.engine.DefaultUnitOfWork;
@@ -66,7 +65,7 @@ public class CachedOutputStreamTest extends ContextTestSupport {
 
         exchange = new DefaultExchange(context);
         UnitOfWork uow = new DefaultUnitOfWork(exchange);
-        exchange.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+        exchange.getExchangeExtension().setUnitOfWork(uow);
     }
 
     @Override
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
index 620b98fbe52..55ab8afbb6e 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java
@@ -213,7 +213,7 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
         // we want the copy to have an uow
         UnitOfWork uow = getEndpoint().getCamelContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory()
                 .createUnitOfWork(copy);
-        copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+        copy.getExchangeExtension().setUnitOfWork(uow);
 
         return copy;
     }