You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/04 12:45:16 UTC
[camel] branch main updated: CAMEL-18578: camel-lra use JDK11 client
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 9e648dc3455 CAMEL-18578: camel-lra use JDK11 client
9e648dc3455 is described below
commit 9e648dc3455ee2aec56bdbd7f58ce3d43c35ae69
Author: Croway <fe...@gmail.com>
AuthorDate: Mon Oct 3 12:19:49 2022 +0200
CAMEL-18578: camel-lra use JDK11 client
---
components/camel-lra/pom.xml | 7 --
.../org/apache/camel/service/lra/LRAClient.java | 120 +++++++++------------
.../camel/service/lra/AbstractLRATestSupport.java | 28 ++---
.../org/apache/camel/service/lra/LRAOptionsIT.java | 2 +-
.../org/apache/camel/model/SagaDefinition.java | 2 +-
5 files changed, 65 insertions(+), 94 deletions(-)
diff --git a/components/camel-lra/pom.xml b/components/camel-lra/pom.xml
index 45d82973329..b1b5d4c4840 100644
--- a/components/camel-lra/pom.xml
+++ b/components/camel-lra/pom.xml
@@ -40,7 +40,6 @@
</properties>
<dependencies>
-
<!-- requires camel-core -->
<dependency>
<groupId>org.apache.camel</groupId>
@@ -50,14 +49,8 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-saga</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.cxf</groupId>
- <artifactId>cxf-rt-rs-client</artifactId>
- </dependency>
-
<!-- test dependencies -->
-
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-undertow</artifactId>
diff --git a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
index 166193346de..04873f3998e 100644
--- a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
+++ b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
@@ -18,20 +18,16 @@ package org.apache.camel.service.lra;
import java.io.Closeable;
import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URI;
import java.net.URL;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.util.ObjectHelper;
@@ -47,46 +43,43 @@ import static org.apache.camel.service.lra.LRAConstants.PARTICIPANT_PATH_COMPLET
public class LRAClient implements Closeable {
private final LRASagaService sagaService;
- private final Client client;
- private final WebTarget target;
+ private final HttpClient client;
+ private final String lraUrl;
public LRAClient(LRASagaService sagaService) {
this.sagaService = sagaService;
- this.client = ClientBuilder.newBuilder()
- // CAMEL-12204: disabled for compatibility with JAX-RS 2.0
- //.executorService(sagaService.getExecutorService())
- .build();
+ client = HttpClient.newHttpClient();
- this.target = client.target(
- new LRAUrlBuilder()
- .host(sagaService.getCoordinatorUrl())
- .path(sagaService.getCoordinatorContextPath())
- .build());
+ lraUrl = new LRAUrlBuilder()
+ .host(sagaService.getCoordinatorUrl())
+ .path(sagaService.getCoordinatorContextPath())
+ .build();
}
public CompletableFuture<URL> newLRA() {
- CompletableFuture<Response> future = new CompletableFuture<>();
- target.path(COORDINATOR_PATH_START)
- .request()
- .async()
- .post(Entity.text(""), callbackToCompletableFuture(future));
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(lraUrl + "/" + COORDINATOR_PATH_START))
+ .POST(HttpRequest.BodyPublishers.ofString(""))
+ .build();
+
+ CompletableFuture<HttpResponse<String>> future = client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
return future.thenApply(res -> {
// See if there's a location header containing the LRA URL
- String location = res.getHeaderString(HttpHeaders.LOCATION);
+ List<String> location = res.headers().map().get("Location");
if (ObjectHelper.isNotEmpty(location)) {
- return toURL(location);
+ return toURL(location.get(0));
}
// If there's no location header try the Long-Running-Action header, assuming there's only one present in the response
- List<Object> lraHeaders = res.getHeaders().get(Exchange.SAGA_LONG_RUNNING_ACTION);
+ List<String> lraHeaders = res.headers().map().get(Exchange.SAGA_LONG_RUNNING_ACTION);
if (ObjectHelper.isNotEmpty(lraHeaders) && lraHeaders.size() == 1) {
return toURL(lraHeaders.get(0));
}
// Fallback to reading the URL from the response body
- String responseBody = res.readEntity(String.class);
+ String responseBody = res.body();
if (ObjectHelper.isNotEmpty(responseBody)) {
return toURL(responseBody);
}
@@ -95,7 +88,7 @@ public class LRAClient implements Closeable {
});
}
- public CompletableFuture<Void> join(URL lra, LRASagaStep step) {
+ public CompletableFuture<Void> join(final URL lra, LRASagaStep step) {
return CompletableFuture.supplyAsync(() -> {
LRAUrlBuilder participantBaseUrl = new LRAUrlBuilder()
.host(sagaService.getLocalParticipantUrl())
@@ -112,23 +105,23 @@ public class LRAClient implements Closeable {
link.append(',');
link.append('<').append(completionURL).append('>').append("; rel=complete");
- WebTarget joinTarget = client.target(lra.toString());
+ String lraEndpoint = lra.toString();
if (step.getTimeoutInMilliseconds().isPresent()) {
- joinTarget = joinTarget.queryParam(HEADER_TIME_LIMIT, step.getTimeoutInMilliseconds().get());
+ lraEndpoint = lraEndpoint + "?" + HEADER_TIME_LIMIT + "=" + step.getTimeoutInMilliseconds().get();
}
-
- CompletableFuture<Response> future = new CompletableFuture<>();
- joinTarget.request()
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(lraEndpoint))
.header(HEADER_LINK, link.toString())
- .header(Exchange.SAGA_LONG_RUNNING_ACTION, lra)
- .async()
- .put(Entity.entity(link.toString(), MediaType.TEXT_PLAIN), callbackToCompletableFuture(future));
+ .header(Exchange.SAGA_LONG_RUNNING_ACTION, lra.toString())
+ .header("Content-Type", "text/plain")
+ .PUT(HttpRequest.BodyPublishers.ofString(link.toString()))
+ .build();
- return future;
+ return client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
}, sagaService.getExecutorService())
.thenCompose(Function.identity())
.thenApply(response -> {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
+ if (response.statusCode() != HttpURLConnection.HTTP_OK) {
throw new RuntimeCamelException("Cannot join LRA");
}
@@ -137,15 +130,16 @@ public class LRAClient implements Closeable {
}
public CompletableFuture<Void> complete(URL lra) {
- CompletableFuture<Response> future = new CompletableFuture<>();
- client.target(lra.toString())
- .path(COORDINATOR_PATH_CLOSE)
- .request()
- .async()
- .put(Entity.entity("", MediaType.TEXT_PLAIN), callbackToCompletableFuture(future));
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(lra.toString() + "/" + COORDINATOR_PATH_CLOSE))
+ .header("Content-Type", "text/plain")
+ .PUT(HttpRequest.BodyPublishers.ofString(""))
+ .build();
+
+ CompletableFuture<HttpResponse<String>> future = client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
return future.thenApply(response -> {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
+ if (response.statusCode() != HttpURLConnection.HTTP_OK) {
throw new RuntimeCamelException("Cannot complete LRA");
}
@@ -154,15 +148,16 @@ public class LRAClient implements Closeable {
}
public CompletableFuture<Void> compensate(URL lra) {
- CompletableFuture<Response> future = new CompletableFuture<>();
- client.target(lra.toString())
- .path(COORDINATOR_PATH_CANCEL)
- .request()
- .async()
- .put(Entity.entity("", MediaType.TEXT_PLAIN), callbackToCompletableFuture(future));
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(lra.toString() + "/" + COORDINATOR_PATH_CANCEL))
+ .header("Content-Type", "text/plain")
+ .PUT(HttpRequest.BodyPublishers.ofString(""))
+ .build();
+
+ CompletableFuture<HttpResponse<String>> future = client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
return future.thenApply(response -> {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
+ if (response.statusCode() != HttpURLConnection.HTTP_OK) {
throw new RuntimeCamelException("Cannot compensate LRA");
}
@@ -170,20 +165,6 @@ public class LRAClient implements Closeable {
});
}
- private InvocationCallback<Response> callbackToCompletableFuture(CompletableFuture<Response> future) {
- return new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- future.complete(response);
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(throwable);
- }
- };
- }
-
private URL toURL(Object url) {
if (url == null) {
return null;
@@ -201,8 +182,5 @@ public class LRAClient implements Closeable {
@Override
public void close() throws IOException {
- if (client != null) {
- client.close();
- }
}
}
diff --git a/components/camel-lra/src/test/java/org/apache/camel/service/lra/AbstractLRATestSupport.java b/components/camel-lra/src/test/java/org/apache/camel/service/lra/AbstractLRATestSupport.java
index 106c6e35f23..7be4bc38b6b 100644
--- a/components/camel-lra/src/test/java/org/apache/camel/service/lra/AbstractLRATestSupport.java
+++ b/components/camel-lra/src/test/java/org/apache/camel/service/lra/AbstractLRATestSupport.java
@@ -17,11 +17,10 @@
package org.apache.camel.service.lra;
import java.io.IOException;
-import java.io.InputStream;
-
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -47,12 +46,12 @@ public abstract class AbstractLRATestSupport extends CamelTestSupport {
private int activeLRAs;
@BeforeEach
- public void getActiveLRAs() throws IOException {
+ public void getActiveLRAs() throws IOException, InterruptedException {
this.activeLRAs = getNumberOfActiveLRAs();
}
@AfterEach
- public void checkActiveLRAs() throws IOException {
+ public void checkActiveLRAs() throws IOException, InterruptedException {
await().atMost(2, SECONDS).until(() -> getNumberOfActiveLRAs(), equalTo(activeLRAs));
assertEquals(activeLRAs, getNumberOfActiveLRAs(), "Some LRA have been left pending");
}
@@ -81,16 +80,17 @@ public abstract class AbstractLRATestSupport extends CamelTestSupport {
return sagaService;
}
- protected int getNumberOfActiveLRAs() throws IOException {
- Client client = ClientBuilder.newClient();
+ protected int getNumberOfActiveLRAs() throws IOException, InterruptedException {
+ HttpClient client = HttpClient.newHttpClient();
+
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(getCoordinatorURL() + "/lra-coordinator"))
+ .build();
- Response response = client.target(getCoordinatorURL() + "/lra-coordinator")
- .request()
- .accept("application/json")
- .get();
+ HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
ObjectMapper mapper = new ObjectMapper();
- JsonNode lras = mapper.readTree(InputStream.class.cast(response.getEntity()));
+ JsonNode lras = mapper.readTree(response.body());
return lras.size();
}
diff --git a/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAOptionsIT.java b/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAOptionsIT.java
index 736db907a58..a1794f18b60 100644
--- a/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAOptionsIT.java
+++ b/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAOptionsIT.java
@@ -89,7 +89,7 @@ public class LRAOptionsIT extends AbstractLRATestSupport {
from("direct:wrong-expression")
.saga()
- .option("id", simple("${10 / 0}"))
+ .option("id", simple("${body.pippo.pluto}"))
.to("log:info");
}
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/SagaDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/SagaDefinition.java
index fb7cdea843a..4121a4e02d1 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/SagaDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/SagaDefinition.java
@@ -259,7 +259,7 @@ public class SagaDefinition extends OutputDefinition<SagaDefinition> {
}
public SagaDefinition timeout(Duration duration) {
- return timeout(TimeUtils.printDuration(duration));
+ return timeout(TimeUtils.printDuration(duration, true));
}
public SagaDefinition timeout(long timeout, TimeUnit unit) {