You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/04/26 18:31:28 UTC
[camel] branch main updated: CAMEL-15105: cleaned up handling the unit of work and onCompletions
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 5443df2c033 CAMEL-15105: cleaned up handling the unit of work and onCompletions
5443df2c033 is described below
commit 5443df2c0333469df47e37a2d50fcef52c1ff41c
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Apr 26 19:20:19 2023 +0200
CAMEL-15105: cleaned up handling the unit of work and onCompletions
---
.../org/apache/camel/support/AbstractExchange.java | 68 +---------------------
.../camel/support/DefaultPooledExchange.java | 6 --
.../camel/support/ExtendedExchangeExtension.java | 67 +++++++++++++++++++--
3 files changed, 64 insertions(+), 77 deletions(-)
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index ba484068884..096f65eb59c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -66,9 +66,7 @@ class AbstractExchange implements Exchange {
Message out;
Exception exception;
String exchangeId;
- UnitOfWork unitOfWork;
ExchangePattern pattern;
- List<Synchronization> onCompletions;
Boolean externalRedelivered;
boolean routeStop;
boolean rollbackOnly;
@@ -92,11 +90,11 @@ class AbstractExchange implements Exchange {
this.context = parent.getContext();
this.pattern = parent.getPattern();
this.created = parent.getCreated();
- this.unitOfWork = parent.getUnitOfWork();
privateExtension = new ExtendedExchangeExtension(this);
privateExtension.setFromEndpoint(parent.getFromEndpoint());
privateExtension.setFromRouteId(parent.getFromRouteId());
+ privateExtension.setUnitOfWork(parent.getUnitOfWork());
}
public AbstractExchange(Endpoint fromEndpoint) {
@@ -678,69 +676,7 @@ class AbstractExchange implements Exchange {
@Override
public UnitOfWork getUnitOfWork() {
- return unitOfWork;
- }
-
- void setUnitOfWork(UnitOfWork unitOfWork) {
- this.unitOfWork = unitOfWork;
- if (unitOfWork != null && onCompletions != null) {
- // now an unit of work has been assigned so add the on completions
- // we might have registered already
- for (Synchronization onCompletion : onCompletions) {
- unitOfWork.addSynchronization(onCompletion);
- }
- // cleanup the temporary on completion list as they now have been registered
- // on the unit of work
- onCompletions.clear();
- onCompletions = null;
- }
- }
-
- void addOnCompletion(Synchronization onCompletion) {
- if (unitOfWork == null) {
- // unit of work not yet registered so we store the on completion temporary
- // until the unit of work is assigned to this exchange by the unit of work
- if (onCompletions == null) {
- onCompletions = new ArrayList<>();
- }
- onCompletions.add(onCompletion);
- } else {
- getUnitOfWork().addSynchronization(onCompletion);
- }
- }
-
- boolean containsOnCompletion(Synchronization onCompletion) {
- if (unitOfWork != null) {
- // if there is an unit of work then the completions is moved there
- return unitOfWork.containsSynchronization(onCompletion);
- } else {
- // check temporary completions if no unit of work yet
- return onCompletions != null && onCompletions.contains(onCompletion);
- }
- }
-
- void handoverCompletions(Exchange target) {
- if (onCompletions != null) {
- for (Synchronization onCompletion : onCompletions) {
- target.getExchangeExtension().addOnCompletion(onCompletion);
- }
- // cleanup the temporary on completion list as they have been handed over
- onCompletions.clear();
- onCompletions = null;
- } else if (unitOfWork != null) {
- // let unit of work handover
- unitOfWork.handoverSynchronization(target);
- }
- }
-
- List<Synchronization> handoverCompletions() {
- List<Synchronization> answer = null;
- if (onCompletions != null) {
- answer = new ArrayList<>(onCompletions);
- onCompletions.clear();
- onCompletions = null;
- }
- return answer;
+ return privateExtension.getUnitOfWork();
}
/**
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index 21ac295a2b7..958b6b92179 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -99,15 +99,9 @@ public final class DefaultPooledExchange extends AbstractExchange implements Poo
out.reset();
this.out = null;
}
- if (this.unitOfWork != null) {
- this.unitOfWork.reset();
- }
this.exception = null;
// reset pattern to original
this.pattern = originalPattern;
- if (this.onCompletions != null) {
- this.onCompletions.clear();
- }
// do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again
this.externalRedelivered = null;
this.routeStop = false;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
index 8080af78e0c..f1f13fb139a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
@@ -17,6 +17,7 @@
package org.apache.camel.support;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -44,6 +45,8 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
private boolean interruptable = true;
private boolean interrupted;
private AsyncCallback defaultConsumerCallback; // optimize (do not reset)
+ private UnitOfWork unitOfWork;
+ private List<Synchronization> onCompletions;
ExtendedExchangeExtension(AbstractExchange exchange) {
this.exchange = exchange;
@@ -84,7 +87,16 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
@Override
public void addOnCompletion(Synchronization onCompletion) {
- this.exchange.addOnCompletion(onCompletion);
+ if (unitOfWork == null) {
+ // unit of work not yet registered so we store the on completion temporary
+ // until the unit of work is assigned to this exchange by the unit of work
+ if (onCompletions == null) {
+ onCompletions = new ArrayList<>();
+ }
+ onCompletions.add(onCompletion);
+ } else {
+ unitOfWork.addSynchronization(onCompletion);
+ }
}
@Override
@@ -119,17 +131,44 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
@Override
public void handoverCompletions(Exchange target) {
- this.exchange.handoverCompletions(target);
+ if (onCompletions != null) {
+ for (Synchronization onCompletion : onCompletions) {
+ target.getExchangeExtension().addOnCompletion(onCompletion);
+ }
+ // cleanup the temporary on completion list as they have been handed over
+ onCompletions.clear();
+ onCompletions = null;
+ } else if (unitOfWork != null) {
+ // let unit of work handover
+ unitOfWork.handoverSynchronization(target);
+ }
}
@Override
public List<Synchronization> handoverCompletions() {
- return this.exchange.handoverCompletions();
+ List<Synchronization> answer = null;
+ if (onCompletions != null) {
+ answer = new ArrayList<>(onCompletions);
+ onCompletions.clear();
+ onCompletions = null;
+ }
+ return answer;
}
@Override
public void setUnitOfWork(UnitOfWork unitOfWork) {
- this.exchange.setUnitOfWork(unitOfWork);
+ this.unitOfWork = unitOfWork;
+ if (unitOfWork != null && onCompletions != null) {
+ // now an unit of work has been assigned so add the on completions
+ // we might have registered already
+ for (Synchronization onCompletion : onCompletions) {
+ unitOfWork.addSynchronization(onCompletion);
+ }
+ // cleanup the temporary on completion list as they now have been registered
+ // on the unit of work
+ onCompletions.clear();
+ onCompletions = null;
+ }
}
@Override
@@ -189,7 +228,13 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
@Override
public boolean containsOnCompletion(Synchronization onCompletion) {
- return this.exchange.containsOnCompletion(onCompletion);
+ if (unitOfWork != null) {
+ // if there is an unit of work then the completions is moved there
+ return unitOfWork.containsSynchronization(onCompletion);
+ } else {
+ // check temporary completions if no unit of work yet
+ return onCompletions != null && onCompletions.contains(onCompletion);
+ }
}
@Override
@@ -253,7 +298,19 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
this.failureHandled = failureHandled;
}
+ public UnitOfWork getUnitOfWork() {
+ return unitOfWork;
+ }
+
public void reset() {
+ if (this.unitOfWork != null) {
+ this.unitOfWork.reset();
+ }
+
+ if (this.onCompletions != null) {
+ this.onCompletions.clear();
+ }
+
setHistoryNodeId(null);
setHistoryNodeLabel(null);
setTransacted(false);