You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pp...@apache.org on 2022/04/13 07:56:20 UTC
[camel-quarkus] branch main updated: fixup 501833 Fix Aws2KinesisTest.kinesis test #3638
This is an automated email from the ASF dual-hosted git repository.
ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new ed2e547078 fixup 501833 Fix Aws2KinesisTest.kinesis test #3638
ed2e547078 is described below
commit ed2e54707875da34633c5aaa9aa3e8e46415bb55
Author: Peter Palaga <pp...@redhat.com>
AuthorDate: Tue Apr 12 23:50:57 2022 +0200
fixup 501833 Fix Aws2KinesisTest.kinesis test #3638
---
.../aws2/kinesis/it/Aws2KinesisResource.java | 17 +++----
.../aws2/kinesis/it/Aws2KinesisRoutes.java | 59 ++++++++++++++++++++++
.../component/aws2/kinesis/it/Aws2KinesisTest.java | 13 +++--
3 files changed, 75 insertions(+), 14 deletions(-)
diff --git a/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisResource.java b/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisResource.java
index d5f6be4f2c..d6afdd04fd 100644
--- a/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisResource.java
+++ b/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisResource.java
@@ -16,12 +16,13 @@
*/
package org.apache.camel.quarkus.component.aws2.kinesis.it;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
+import java.util.Queue;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
+import javax.inject.Named;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -30,15 +31,17 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.jboss.logging.Logger;
@Path("/aws2-kinesis")
@ApplicationScoped
public class Aws2KinesisResource {
+ private static final Logger log = Logger.getLogger(Aws2KinesisResource.class);
+
@ConfigProperty(name = "aws-kinesis.stream-name")
String streamName;
@@ -46,7 +49,8 @@ public class Aws2KinesisResource {
ProducerTemplate producerTemplate;
@Inject
- ConsumerTemplate consumerTemplate;
+ @Named("aws2KinesisMessages")
+ Queue<String> aws2KinesisMessages;
@Path("/send")
@POST
@@ -69,12 +73,7 @@ public class Aws2KinesisResource {
@GET
@Produces(MediaType.TEXT_PLAIN)
public String receive() throws IOException {
- try (ByteArrayInputStream result = consumerTemplate.receiveBody(componentUri(), 10000, ByteArrayInputStream.class)) {
- if (result == null) {
- return null;
- }
- return new String(result.readAllBytes());
- }
+ return aws2KinesisMessages.poll();
}
private String componentUri() {
diff --git a/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisRoutes.java b/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisRoutes.java
new file mode 100644
index 0000000000..ffce077e08
--- /dev/null
+++ b/integration-test-groups/aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisRoutes.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.kinesis.it;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Singleton;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+@ApplicationScoped
+public class Aws2KinesisRoutes extends RouteBuilder {
+
+ @ConfigProperty(name = "aws-kinesis.stream-name")
+ String streamName;
+
+ @Inject
+ @Named("aws2KinesisMessages")
+ Queue<String> aws2KinesisMessages;
+
+ private String componentUri() {
+ return "aws2-kinesis://" + streamName;
+ }
+
+ @Override
+ public void configure() throws Exception {
+ from(componentUri())
+ .process(exchange -> aws2KinesisMessages.add(exchange.getMessage().getBody(String.class)));
+ }
+
+ static class Producers {
+ @Singleton
+ @javax.enterprise.inject.Produces
+ @Named("aws2KinesisMessages")
+ Queue<String> aws2KinesisMessages() {
+ return new ConcurrentLinkedDeque<>();
+ }
+ }
+
+}
diff --git a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java b/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
index cfc8a4358b..2cce2d2f24 100644
--- a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
+++ b/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
@@ -30,7 +30,6 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
-import org.hamcrest.Matchers;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.localstack.LocalStackContainer.Service;
@@ -61,10 +60,14 @@ class Aws2KinesisTest {
.then()
.statusCode(201);
- RestAssured.get("/aws2-kinesis/receive")
- .then()
- .statusCode(200)
- .body(Matchers.is(msg));
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(
+ () -> RestAssured.get("/aws2-kinesis/receive").then().extract(),
+ response -> {
+ final int status = response.statusCode();
+ final String body = status == 200 ? response.body().asString() : null;
+ LOG.info("Got " + status + " " + body);
+ return response.statusCode() == 200 && msg.equals(body);
+ });
}
@Test