You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pp...@apache.org on 2022/04/13 07:56:20 UTC

[camel-quarkus] branch main updated: fixup 501833 Fix Aws2KinesisTest.kinesis test #3638

This is an automated email from the ASF dual-hosted git repository.

ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new ed2e547078 fixup 501833 Fix Aws2KinesisTest.kinesis test #3638
ed2e547078 is described below

commit ed2e54707875da34633c5aaa9aa3e8e46415bb55
Author: Peter Palaga <pp...@redhat.com>
AuthorDate: Tue Apr 12 23:50:57 2022 +0200

    fixup 501833 Fix Aws2KinesisTest.kinesis test #3638
---
 .../aws2/kinesis/it/Aws2KinesisResource.java       | 17 +++----
 .../aws2/kinesis/it/Aws2KinesisRoutes.java         | 59 ++++++++++++++++++++++
 .../component/aws2/kinesis/it/Aws2KinesisTest.java | 13 +++--
 3 files changed, 75 insertions(+), 14 deletions(-)

diff --git a/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisResource.java b/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisResource.java
index d5f6be4f2c..d6afdd04fd 100644
--- a/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisResource.java
+++ b/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisResource.java
@@ -16,12 +16,13 @@
  */
 package org.apache.camel.quarkus.component.aws2.kinesis.it;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Queue;
 
 import javax.enterprise.context.ApplicationScoped;
 import javax.inject.Inject;
+import javax.inject.Named;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -30,15 +31,17 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
 import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.jboss.logging.Logger;
 
 @Path("/aws2-kinesis")
 @ApplicationScoped
 public class Aws2KinesisResource {
 
+    private static final Logger log = Logger.getLogger(Aws2KinesisResource.class);
+
     @ConfigProperty(name = "aws-kinesis.stream-name")
     String streamName;
 
@@ -46,7 +49,8 @@ public class Aws2KinesisResource {
     ProducerTemplate producerTemplate;
 
     @Inject
-    ConsumerTemplate consumerTemplate;
+    @Named("aws2KinesisMessages")
+    Queue<String> aws2KinesisMessages;
 
     @Path("/send")
     @POST
@@ -69,12 +73,7 @@ public class Aws2KinesisResource {
     @GET
     @Produces(MediaType.TEXT_PLAIN)
     public String receive() throws IOException {
-        try (ByteArrayInputStream result = consumerTemplate.receiveBody(componentUri(), 10000, ByteArrayInputStream.class)) {
-            if (result == null) {
-                return null;
-            }
-            return new String(result.readAllBytes());
-        }
+        return aws2KinesisMessages.poll();
     }
 
     private String componentUri() {
diff --git a/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisRoutes.java b/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisRoutes.java
new file mode 100644
index 0000000000..ffce077e08
--- /dev/null
+++ b/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisRoutes.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.kinesis.it;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Singleton;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+@ApplicationScoped
+public class Aws2KinesisRoutes extends RouteBuilder {
+
+    @ConfigProperty(name = "aws-kinesis.stream-name")
+    String streamName;
+
+    @Inject
+    @Named("aws2KinesisMessages")
+    Queue<String> aws2KinesisMessages;
+
+    private String componentUri() {
+        return "aws2-kinesis://" + streamName;
+    }
+
+    @Override
+    public void configure() throws Exception {
+        from(componentUri())
+                .process(exchange -> aws2KinesisMessages.add(exchange.getMessage().getBody(String.class)));
+    }
+
+    static class Producers {
+        @Singleton
+        @javax.enterprise.inject.Produces
+        @Named("aws2KinesisMessages")
+        Queue<String> aws2KinesisMessages() {
+            return new ConcurrentLinkedDeque<>();
+        }
+    }
+
+}
diff --git a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java b/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
index cfc8a4358b..2cce2d2f24 100644
--- a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
+++ b/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
@@ -30,7 +30,6 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.awaitility.Awaitility;
 import org.eclipse.microprofile.config.Config;
 import org.eclipse.microprofile.config.ConfigProvider;
-import org.hamcrest.Matchers;
 import org.jboss.logging.Logger;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.localstack.LocalStackContainer.Service;
@@ -61,10 +60,14 @@ class Aws2KinesisTest {
                 .then()
                 .statusCode(201);
 
-        RestAssured.get("/aws2-kinesis/receive")
-                .then()
-                .statusCode(200)
-                .body(Matchers.is(msg));
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(
+                () -> RestAssured.get("/aws2-kinesis/receive").then().extract(),
+                response -> {
+                    final int status = response.statusCode();
+                    final String body = status == 200 ? response.body().asString() : null;
+                    LOG.info("Got " + status + " " + body);
+                    return response.statusCode() == 200 && msg.equals(body);
+                });
     }
 
     @Test