You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/04 12:45:16 UTC

[camel] branch main updated: CAMEL-18578: camel-lra use JDK11 client

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e648dc3455 CAMEL-18578: camel-lra use JDK11 client
9e648dc3455 is described below

commit 9e648dc3455ee2aec56bdbd7f58ce3d43c35ae69
Author: Croway <fe...@gmail.com>
AuthorDate: Mon Oct 3 12:19:49 2022 +0200

    CAMEL-18578: camel-lra use JDK11 client
---
 components/camel-lra/pom.xml                       |   7 --
 .../org/apache/camel/service/lra/LRAClient.java    | 120 +++++++++------------
 .../camel/service/lra/AbstractLRATestSupport.java  |  28 ++---
 .../org/apache/camel/service/lra/LRAOptionsIT.java |   2 +-
 .../org/apache/camel/model/SagaDefinition.java     |   2 +-
 5 files changed, 65 insertions(+), 94 deletions(-)

diff --git a/components/camel-lra/pom.xml b/components/camel-lra/pom.xml
index 45d82973329..b1b5d4c4840 100644
--- a/components/camel-lra/pom.xml
+++ b/components/camel-lra/pom.xml
@@ -40,7 +40,6 @@
     </properties>
 
     <dependencies>
-
         <!-- requires camel-core -->
         <dependency>
             <groupId>org.apache.camel</groupId>
@@ -50,14 +49,8 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-saga</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.cxf</groupId>
-            <artifactId>cxf-rt-rs-client</artifactId>
-        </dependency>
-
 
         <!-- test dependencies -->
-
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-undertow</artifactId>
diff --git a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
index 166193346de..04873f3998e 100644
--- a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
+++ b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
@@ -18,20 +18,16 @@ package org.apache.camel.service.lra;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URI;
 import java.net.URL;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.util.ObjectHelper;
@@ -47,46 +43,43 @@ import static org.apache.camel.service.lra.LRAConstants.PARTICIPANT_PATH_COMPLET
 public class LRAClient implements Closeable {
 
     private final LRASagaService sagaService;
-    private final Client client;
-    private final WebTarget target;
+    private final HttpClient client;
+    private final String lraUrl;
 
     public LRAClient(LRASagaService sagaService) {
         this.sagaService = sagaService;
 
-        this.client = ClientBuilder.newBuilder()
-                // CAMEL-12204: disabled for compatibility with JAX-RS 2.0
-                //.executorService(sagaService.getExecutorService())
-                .build();
+        client = HttpClient.newHttpClient();
 
-        this.target = client.target(
-                new LRAUrlBuilder()
-                        .host(sagaService.getCoordinatorUrl())
-                        .path(sagaService.getCoordinatorContextPath())
-                        .build());
+        lraUrl = new LRAUrlBuilder()
+                .host(sagaService.getCoordinatorUrl())
+                .path(sagaService.getCoordinatorContextPath())
+                .build();
     }
 
     public CompletableFuture<URL> newLRA() {
-        CompletableFuture<Response> future = new CompletableFuture<>();
-        target.path(COORDINATOR_PATH_START)
-                .request()
-                .async()
-                .post(Entity.text(""), callbackToCompletableFuture(future));
+        HttpRequest request = HttpRequest.newBuilder()
+                .uri(URI.create(lraUrl + "/" + COORDINATOR_PATH_START))
+                .POST(HttpRequest.BodyPublishers.ofString(""))
+                .build();
+
+        CompletableFuture<HttpResponse<String>> future = client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
 
         return future.thenApply(res -> {
             // See if there's a location header containing the LRA URL
-            String location = res.getHeaderString(HttpHeaders.LOCATION);
+            List<String> location = res.headers().map().get("Location");
             if (ObjectHelper.isNotEmpty(location)) {
-                return toURL(location);
+                return toURL(location.get(0));
             }
 
             // If there's no location header try the Long-Running-Action header, assuming there's only one present in the response
-            List<Object> lraHeaders = res.getHeaders().get(Exchange.SAGA_LONG_RUNNING_ACTION);
+            List<String> lraHeaders = res.headers().map().get(Exchange.SAGA_LONG_RUNNING_ACTION);
             if (ObjectHelper.isNotEmpty(lraHeaders) && lraHeaders.size() == 1) {
                 return toURL(lraHeaders.get(0));
             }
 
             // Fallback to reading the URL from the response body
-            String responseBody = res.readEntity(String.class);
+            String responseBody = res.body();
             if (ObjectHelper.isNotEmpty(responseBody)) {
                 return toURL(responseBody);
             }
@@ -95,7 +88,7 @@ public class LRAClient implements Closeable {
         });
     }
 
-    public CompletableFuture<Void> join(URL lra, LRASagaStep step) {
+    public CompletableFuture<Void> join(final URL lra, LRASagaStep step) {
         return CompletableFuture.supplyAsync(() -> {
             LRAUrlBuilder participantBaseUrl = new LRAUrlBuilder()
                     .host(sagaService.getLocalParticipantUrl())
@@ -112,23 +105,23 @@ public class LRAClient implements Closeable {
             link.append(',');
             link.append('<').append(completionURL).append('>').append("; rel=complete");
 
-            WebTarget joinTarget = client.target(lra.toString());
+            String lraEndpoint = lra.toString();
             if (step.getTimeoutInMilliseconds().isPresent()) {
-                joinTarget = joinTarget.queryParam(HEADER_TIME_LIMIT, step.getTimeoutInMilliseconds().get());
+                lraEndpoint = lraEndpoint + "?" + HEADER_TIME_LIMIT + "=" + step.getTimeoutInMilliseconds().get();
             }
-
-            CompletableFuture<Response> future = new CompletableFuture<>();
-            joinTarget.request()
+            HttpRequest request = HttpRequest.newBuilder()
+                    .uri(URI.create(lraEndpoint))
                     .header(HEADER_LINK, link.toString())
-                    .header(Exchange.SAGA_LONG_RUNNING_ACTION, lra)
-                    .async()
-                    .put(Entity.entity(link.toString(), MediaType.TEXT_PLAIN), callbackToCompletableFuture(future));
+                    .header(Exchange.SAGA_LONG_RUNNING_ACTION, lra.toString())
+                    .header("Content-Type", "text/plain")
+                    .PUT(HttpRequest.BodyPublishers.ofString(link.toString()))
+                    .build();
 
-            return future;
+            return client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
         }, sagaService.getExecutorService())
                 .thenCompose(Function.identity())
                 .thenApply(response -> {
-                    if (response.getStatus() != Response.Status.OK.getStatusCode()) {
+                    if (response.statusCode() != HttpURLConnection.HTTP_OK) {
                         throw new RuntimeCamelException("Cannot join LRA");
                     }
 
@@ -137,15 +130,16 @@ public class LRAClient implements Closeable {
     }
 
     public CompletableFuture<Void> complete(URL lra) {
-        CompletableFuture<Response> future = new CompletableFuture<>();
-        client.target(lra.toString())
-                .path(COORDINATOR_PATH_CLOSE)
-                .request()
-                .async()
-                .put(Entity.entity("", MediaType.TEXT_PLAIN), callbackToCompletableFuture(future));
+        HttpRequest request = HttpRequest.newBuilder()
+                .uri(URI.create(lra.toString() + "/" + COORDINATOR_PATH_CLOSE))
+                .header("Content-Type", "text/plain")
+                .PUT(HttpRequest.BodyPublishers.ofString(""))
+                .build();
+
+        CompletableFuture<HttpResponse<String>> future = client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
 
         return future.thenApply(response -> {
-            if (response.getStatus() != Response.Status.OK.getStatusCode()) {
+            if (response.statusCode() != HttpURLConnection.HTTP_OK) {
                 throw new RuntimeCamelException("Cannot complete LRA");
             }
 
@@ -154,15 +148,16 @@ public class LRAClient implements Closeable {
     }
 
     public CompletableFuture<Void> compensate(URL lra) {
-        CompletableFuture<Response> future = new CompletableFuture<>();
-        client.target(lra.toString())
-                .path(COORDINATOR_PATH_CANCEL)
-                .request()
-                .async()
-                .put(Entity.entity("", MediaType.TEXT_PLAIN), callbackToCompletableFuture(future));
+        HttpRequest request = HttpRequest.newBuilder()
+                .uri(URI.create(lra.toString() + "/" + COORDINATOR_PATH_CANCEL))
+                .header("Content-Type", "text/plain")
+                .PUT(HttpRequest.BodyPublishers.ofString(""))
+                .build();
+
+        CompletableFuture<HttpResponse<String>> future = client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
 
         return future.thenApply(response -> {
-            if (response.getStatus() != Response.Status.OK.getStatusCode()) {
+            if (response.statusCode() != HttpURLConnection.HTTP_OK) {
                 throw new RuntimeCamelException("Cannot compensate LRA");
             }
 
@@ -170,20 +165,6 @@ public class LRAClient implements Closeable {
         });
     }
 
-    private InvocationCallback<Response> callbackToCompletableFuture(CompletableFuture<Response> future) {
-        return new InvocationCallback<Response>() {
-            @Override
-            public void completed(Response response) {
-                future.complete(response);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(throwable);
-            }
-        };
-    }
-
     private URL toURL(Object url) {
         if (url == null) {
             return null;
@@ -201,8 +182,5 @@ public class LRAClient implements Closeable {
 
     @Override
     public void close() throws IOException {
-        if (client != null) {
-            client.close();
-        }
     }
 }
diff --git a/components/camel-lra/src/test/java/org/apache/camel/service/lra/AbstractLRATestSupport.java b/components/camel-lra/src/test/java/org/apache/camel/service/lra/AbstractLRATestSupport.java
index 106c6e35f23..7be4bc38b6b 100644
--- a/components/camel-lra/src/test/java/org/apache/camel/service/lra/AbstractLRATestSupport.java
+++ b/components/camel-lra/src/test/java/org/apache/camel/service/lra/AbstractLRATestSupport.java
@@ -17,11 +17,10 @@
 package org.apache.camel.service.lra;
 
 import java.io.IOException;
-import java.io.InputStream;
-
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -47,12 +46,12 @@ public abstract class AbstractLRATestSupport extends CamelTestSupport {
     private int activeLRAs;
 
     @BeforeEach
-    public void getActiveLRAs() throws IOException {
+    public void getActiveLRAs() throws IOException, InterruptedException {
         this.activeLRAs = getNumberOfActiveLRAs();
     }
 
     @AfterEach
-    public void checkActiveLRAs() throws IOException {
+    public void checkActiveLRAs() throws IOException, InterruptedException {
         await().atMost(2, SECONDS).until(() -> getNumberOfActiveLRAs(), equalTo(activeLRAs));
         assertEquals(activeLRAs, getNumberOfActiveLRAs(), "Some LRA have been left pending");
     }
@@ -81,16 +80,17 @@ public abstract class AbstractLRATestSupport extends CamelTestSupport {
         return sagaService;
     }
 
-    protected int getNumberOfActiveLRAs() throws IOException {
-        Client client = ClientBuilder.newClient();
+    protected int getNumberOfActiveLRAs() throws IOException, InterruptedException {
+        HttpClient client = HttpClient.newHttpClient();
+
+        HttpRequest request = HttpRequest.newBuilder()
+                .uri(URI.create(getCoordinatorURL() + "/lra-coordinator"))
+                .build();
 
-        Response response = client.target(getCoordinatorURL() + "/lra-coordinator")
-                .request()
-                .accept("application/json")
-                .get();
+        HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
 
         ObjectMapper mapper = new ObjectMapper();
-        JsonNode lras = mapper.readTree(InputStream.class.cast(response.getEntity()));
+        JsonNode lras = mapper.readTree(response.body());
         return lras.size();
     }
 
diff --git a/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAOptionsIT.java b/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAOptionsIT.java
index 736db907a58..a1794f18b60 100644
--- a/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAOptionsIT.java
+++ b/components/camel-lra/src/test/java/org/apache/camel/service/lra/LRAOptionsIT.java
@@ -89,7 +89,7 @@ public class LRAOptionsIT extends AbstractLRATestSupport {
 
                 from("direct:wrong-expression")
                         .saga()
-                        .option("id", simple("${10 / 0}"))
+                        .option("id", simple("${body.pippo.pluto}"))
                         .to("log:info");
 
             }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/SagaDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/SagaDefinition.java
index fb7cdea843a..4121a4e02d1 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/SagaDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/SagaDefinition.java
@@ -259,7 +259,7 @@ public class SagaDefinition extends OutputDefinition<SagaDefinition> {
     }
 
     public SagaDefinition timeout(Duration duration) {
-        return timeout(TimeUtils.printDuration(duration));
+        return timeout(TimeUtils.printDuration(duration, true));
     }
 
     public SagaDefinition timeout(long timeout, TimeUnit unit) {