You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/04/26 18:31:28 UTC

[camel] branch main updated: CAMEL-15105: cleaned up handling the unit of work and onCompletions

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 5443df2c033 CAMEL-15105: cleaned up handling the unit of work and onCompletions
5443df2c033 is described below

commit 5443df2c0333469df47e37a2d50fcef52c1ff41c
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Apr 26 19:20:19 2023 +0200

    CAMEL-15105: cleaned up handling the unit of work and onCompletions
---
 .../org/apache/camel/support/AbstractExchange.java | 68 +---------------------
 .../camel/support/DefaultPooledExchange.java       |  6 --
 .../camel/support/ExtendedExchangeExtension.java   | 67 +++++++++++++++++++--
 3 files changed, 64 insertions(+), 77 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index ba484068884..096f65eb59c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -66,9 +66,7 @@ class AbstractExchange implements Exchange {
     Message out;
     Exception exception;
     String exchangeId;
-    UnitOfWork unitOfWork;
     ExchangePattern pattern;
-    List<Synchronization> onCompletions;
     Boolean externalRedelivered;
     boolean routeStop;
     boolean rollbackOnly;
@@ -92,11 +90,11 @@ class AbstractExchange implements Exchange {
         this.context = parent.getContext();
         this.pattern = parent.getPattern();
         this.created = parent.getCreated();
-        this.unitOfWork = parent.getUnitOfWork();
 
         privateExtension = new ExtendedExchangeExtension(this);
         privateExtension.setFromEndpoint(parent.getFromEndpoint());
         privateExtension.setFromRouteId(parent.getFromRouteId());
+        privateExtension.setUnitOfWork(parent.getUnitOfWork());
     }
 
     public AbstractExchange(Endpoint fromEndpoint) {
@@ -678,69 +676,7 @@ class AbstractExchange implements Exchange {
 
     @Override
     public UnitOfWork getUnitOfWork() {
-        return unitOfWork;
-    }
-
-    void setUnitOfWork(UnitOfWork unitOfWork) {
-        this.unitOfWork = unitOfWork;
-        if (unitOfWork != null && onCompletions != null) {
-            // now an unit of work has been assigned so add the on completions
-            // we might have registered already
-            for (Synchronization onCompletion : onCompletions) {
-                unitOfWork.addSynchronization(onCompletion);
-            }
-            // cleanup the temporary on completion list as they now have been registered
-            // on the unit of work
-            onCompletions.clear();
-            onCompletions = null;
-        }
-    }
-
-    void addOnCompletion(Synchronization onCompletion) {
-        if (unitOfWork == null) {
-            // unit of work not yet registered so we store the on completion temporary
-            // until the unit of work is assigned to this exchange by the unit of work
-            if (onCompletions == null) {
-                onCompletions = new ArrayList<>();
-            }
-            onCompletions.add(onCompletion);
-        } else {
-            getUnitOfWork().addSynchronization(onCompletion);
-        }
-    }
-
-    boolean containsOnCompletion(Synchronization onCompletion) {
-        if (unitOfWork != null) {
-            // if there is an unit of work then the completions is moved there
-            return unitOfWork.containsSynchronization(onCompletion);
-        } else {
-            // check temporary completions if no unit of work yet
-            return onCompletions != null && onCompletions.contains(onCompletion);
-        }
-    }
-
-    void handoverCompletions(Exchange target) {
-        if (onCompletions != null) {
-            for (Synchronization onCompletion : onCompletions) {
-                target.getExchangeExtension().addOnCompletion(onCompletion);
-            }
-            // cleanup the temporary on completion list as they have been handed over
-            onCompletions.clear();
-            onCompletions = null;
-        } else if (unitOfWork != null) {
-            // let unit of work handover
-            unitOfWork.handoverSynchronization(target);
-        }
-    }
-
-    List<Synchronization> handoverCompletions() {
-        List<Synchronization> answer = null;
-        if (onCompletions != null) {
-            answer = new ArrayList<>(onCompletions);
-            onCompletions.clear();
-            onCompletions = null;
-        }
-        return answer;
+        return privateExtension.getUnitOfWork();
     }
 
     /**
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index 21ac295a2b7..958b6b92179 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -99,15 +99,9 @@ public final class DefaultPooledExchange extends AbstractExchange implements Poo
                 out.reset();
                 this.out = null;
             }
-            if (this.unitOfWork != null) {
-                this.unitOfWork.reset();
-            }
             this.exception = null;
             // reset pattern to original
             this.pattern = originalPattern;
-            if (this.onCompletions != null) {
-                this.onCompletions.clear();
-            }
             // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again
             this.externalRedelivered = null;
             this.routeStop = false;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
index 8080af78e0c..f1f13fb139a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.support;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -44,6 +45,8 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
     private boolean interruptable = true;
     private boolean interrupted;
     private AsyncCallback defaultConsumerCallback; // optimize (do not reset)
+    private UnitOfWork unitOfWork;
+    private List<Synchronization> onCompletions;
 
     ExtendedExchangeExtension(AbstractExchange exchange) {
         this.exchange = exchange;
@@ -84,7 +87,16 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
 
     @Override
     public void addOnCompletion(Synchronization onCompletion) {
-        this.exchange.addOnCompletion(onCompletion);
+        if (unitOfWork == null) {
+            // unit of work not yet registered so we store the on completion temporary
+            // until the unit of work is assigned to this exchange by the unit of work
+            if (onCompletions == null) {
+                onCompletions = new ArrayList<>();
+            }
+            onCompletions.add(onCompletion);
+        } else {
+            unitOfWork.addSynchronization(onCompletion);
+        }
     }
 
     @Override
@@ -119,17 +131,44 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
 
     @Override
     public void handoverCompletions(Exchange target) {
-        this.exchange.handoverCompletions(target);
+        if (onCompletions != null) {
+            for (Synchronization onCompletion : onCompletions) {
+                target.getExchangeExtension().addOnCompletion(onCompletion);
+            }
+            // cleanup the temporary on completion list as they have been handed over
+            onCompletions.clear();
+            onCompletions = null;
+        } else if (unitOfWork != null) {
+            // let unit of work handover
+            unitOfWork.handoverSynchronization(target);
+        }
     }
 
     @Override
     public List<Synchronization> handoverCompletions() {
-        return this.exchange.handoverCompletions();
+        List<Synchronization> answer = null;
+        if (onCompletions != null) {
+            answer = new ArrayList<>(onCompletions);
+            onCompletions.clear();
+            onCompletions = null;
+        }
+        return answer;
     }
 
     @Override
     public void setUnitOfWork(UnitOfWork unitOfWork) {
-        this.exchange.setUnitOfWork(unitOfWork);
+        this.unitOfWork = unitOfWork;
+        if (unitOfWork != null && onCompletions != null) {
+            // now an unit of work has been assigned so add the on completions
+            // we might have registered already
+            for (Synchronization onCompletion : onCompletions) {
+                unitOfWork.addSynchronization(onCompletion);
+            }
+            // cleanup the temporary on completion list as they now have been registered
+            // on the unit of work
+            onCompletions.clear();
+            onCompletions = null;
+        }
     }
 
     @Override
@@ -189,7 +228,13 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
 
     @Override
     public boolean containsOnCompletion(Synchronization onCompletion) {
-        return this.exchange.containsOnCompletion(onCompletion);
+        if (unitOfWork != null) {
+            // if there is an unit of work then the completions is moved there
+            return unitOfWork.containsSynchronization(onCompletion);
+        } else {
+            // check temporary completions if no unit of work yet
+            return onCompletions != null && onCompletions.contains(onCompletion);
+        }
     }
 
     @Override
@@ -253,7 +298,19 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
         this.failureHandled = failureHandled;
     }
 
+    public UnitOfWork getUnitOfWork() {
+        return unitOfWork;
+    }
+
     public void reset() {
+        if (this.unitOfWork != null) {
+            this.unitOfWork.reset();
+        }
+
+        if (this.onCompletions != null) {
+            this.onCompletions.clear();
+        }
+
         setHistoryNodeId(null);
         setHistoryNodeLabel(null);
         setTransacted(false);