You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by al...@apache.org on 2020/10/14 18:51:21 UTC
[camel-quarkus] branch master updated: Added nsq native support
fixes #1722
This is an automated email from the ASF dual-hosted git repository.
aldettinger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/master by this push:
new dd6c3e7 Added nsq native support fixes #1722
dd6c3e7 is described below
commit dd6c3e7d4d986263e16fd1aef80dceb78287ed59
Author: aldettinger <al...@gmail.com>
AuthorDate: Thu Oct 8 20:01:04 2020 +0200
Added nsq native support fixes #1722
---
.../ROOT/pages/reference/extensions/nsq.adoc | 8 +-
.../ROOT/partials/reference/components/nsq.adoc | 6 +-
extensions-jvm/pom.xml | 1 -
.../nsq/deployment/pom.xml | 0
.../component/nsq/deployment/NsqProcessor.java | 22 ++--
{extensions-jvm => extensions}/nsq/pom.xml | 1 -
{extensions-jvm => extensions}/nsq/runtime/pom.xml | 11 ++
.../main/resources/META-INF/quarkus-extension.yaml | 3 +-
extensions/pom.xml | 1 +
.../nsq}/pom.xml | 66 ++++++++++-
.../camel/quarkus/component/nsq/it/NsqLogger.java | 27 +++--
.../quarkus/component/nsq/it/NsqResource.java | 41 +++++--
.../camel/quarkus/component/nsq/it/NsqRoute.java | 80 +++++++++++++
.../camel/quarkus/component/nsq/it/NsqIT.java | 16 +--
.../camel/quarkus/component/nsq/it/NsqTest.java | 129 +++++++++++++++++++++
.../quarkus/component/nsq/it/NsqTestResource.java | 110 ++++++++++++++++++
integration-tests/pom.xml | 1 +
tooling/scripts/test-categories.yaml | 1 +
18 files changed, 459 insertions(+), 65 deletions(-)
diff --git a/docs/modules/ROOT/pages/reference/extensions/nsq.adoc b/docs/modules/ROOT/pages/reference/extensions/nsq.adoc
index dcfd4a0..407e8df 100644
--- a/docs/modules/ROOT/pages/reference/extensions/nsq.adoc
+++ b/docs/modules/ROOT/pages/reference/extensions/nsq.adoc
@@ -2,15 +2,15 @@
// This file was generated by camel-quarkus-maven-plugin:update-extension-doc-page
= NSQ
:cq-artifact-id: camel-quarkus-nsq
-:cq-native-supported: false
-:cq-status: Preview
+:cq-native-supported: true
+:cq-status: Stable
:cq-description: Send and receive messages from NSQ realtime distributed messaging platform.
:cq-deprecated: false
:cq-jvm-since: 1.1.0
-:cq-native-since: n/a
+:cq-native-since: 1.2.0
[.badges]
-[.badge-key]##JVM since##[.badge-supported]##1.1.0## [.badge-key]##Native##[.badge-unsupported]##unsupported##
+[.badge-key]##JVM since##[.badge-supported]##1.1.0## [.badge-key]##Native since##[.badge-supported]##1.2.0##
Send and receive messages from NSQ realtime distributed messaging platform.
diff --git a/docs/modules/ROOT/partials/reference/components/nsq.adoc b/docs/modules/ROOT/partials/reference/components/nsq.adoc
index 34e93e4..50cffb6 100644
--- a/docs/modules/ROOT/partials/reference/components/nsq.adoc
+++ b/docs/modules/ROOT/partials/reference/components/nsq.adoc
@@ -2,11 +2,11 @@
// This file was generated by camel-quarkus-maven-plugin:update-extension-doc-page
:cq-artifact-id: camel-quarkus-nsq
:cq-artifact-id-base: nsq
-:cq-native-supported: false
-:cq-status: Preview
+:cq-native-supported: true
+:cq-status: Stable
:cq-deprecated: false
:cq-jvm-since: 1.1.0
-:cq-native-since: n/a
+:cq-native-since: 1.2.0
:cq-camel-part-name: nsq
:cq-camel-part-title: NSQ
:cq-camel-part-description: Send and receive messages from NSQ realtime distributed messaging platform.
diff --git a/extensions-jvm/pom.xml b/extensions-jvm/pom.xml
index 1ca96e3..8e13181 100644
--- a/extensions-jvm/pom.xml
+++ b/extensions-jvm/pom.xml
@@ -112,7 +112,6 @@
<module>mybatis</module>
<module>nagios</module>
<module>nitrite</module>
- <module>nsq</module>
<module>ognl</module>
<module>openstack</module>
<module>optaplanner</module>
diff --git a/extensions-jvm/nsq/deployment/pom.xml b/extensions/nsq/deployment/pom.xml
similarity index 100%
rename from extensions-jvm/nsq/deployment/pom.xml
rename to extensions/nsq/deployment/pom.xml
diff --git a/extensions-jvm/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java b/extensions/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java
similarity index 62%
rename from extensions-jvm/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java
rename to extensions/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java
index 6ca3923..222cf80 100644
--- a/extensions-jvm/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java
+++ b/extensions/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java
@@ -16,17 +16,13 @@
*/
package org.apache.camel.quarkus.component.nsq.deployment;
+import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.annotations.ExecutionTime;
-import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.FeatureBuildItem;
-import io.quarkus.deployment.pkg.steps.NativeBuild;
-import org.apache.camel.quarkus.core.JvmOnlyRecorder;
-import org.jboss.logging.Logger;
+import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
class NsqProcessor {
- private static final Logger LOG = Logger.getLogger(NsqProcessor.class);
private static final String FEATURE = "camel-nsq";
@BuildStep
@@ -34,13 +30,11 @@ class NsqProcessor {
return new FeatureBuildItem(FEATURE);
}
- /**
- * Remove this once this extension starts supporting the native mode.
- */
- @BuildStep(onlyIf = NativeBuild.class)
- @Record(value = ExecutionTime.RUNTIME_INIT)
- void warnJvmInNative(JvmOnlyRecorder recorder) {
- JvmOnlyRecorder.warnJvmInNative(LOG, FEATURE); // warn at build time
- recorder.warnJvmInNative(FEATURE); // warn at runtime
+ @BuildStep
+ void registerReflectiveClasses(BuildProducer<ReflectiveClassBuildItem> producer) {
+ producer.produce(new ReflectiveClassBuildItem(false, false, "org.apache.commons.pool2.impl.DefaultEvictionPolicy"));
+ producer.produce(new ReflectiveClassBuildItem(false, false, "org.apache.logging.log4j.message.ReusableMessageFactory"));
+ producer.produce(
+ new ReflectiveClassBuildItem(false, false, "org.apache.logging.log4j.message.DefaultFlowMessageFactory"));
}
}
diff --git a/extensions-jvm/nsq/pom.xml b/extensions/nsq/pom.xml
similarity index 97%
rename from extensions-jvm/nsq/pom.xml
rename to extensions/nsq/pom.xml
index 6ad3b96..bedf167 100644
--- a/extensions-jvm/nsq/pom.xml
+++ b/extensions/nsq/pom.xml
@@ -35,6 +35,5 @@
<modules>
<module>deployment</module>
<module>runtime</module>
- <module>integration-test</module>
</modules>
</project>
diff --git a/extensions-jvm/nsq/runtime/pom.xml b/extensions/nsq/runtime/pom.xml
similarity index 89%
rename from extensions-jvm/nsq/runtime/pom.xml
rename to extensions/nsq/runtime/pom.xml
index 260f59f..1fd7046 100644
--- a/extensions-jvm/nsq/runtime/pom.xml
+++ b/extensions/nsq/runtime/pom.xml
@@ -34,6 +34,7 @@
<properties>
<camel.quarkus.jvmSince>1.1.0</camel.quarkus.jvmSince>
+ <camel.quarkus.nativeSince>1.2.0</camel.quarkus.nativeSince>
</properties>
<dependencyManagement>
@@ -54,8 +55,18 @@
<artifactId>camel-quarkus-core</artifactId>
</dependency>
<dependency>
+ <groupId>org.jboss.logmanager</groupId>
+ <artifactId>log4j2-jboss-logmanager</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-nsq</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
diff --git a/extensions-jvm/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml
similarity index 97%
rename from extensions-jvm/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml
rename to extensions/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml
index 85ade51..e9c0a64 100644
--- a/extensions-jvm/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml
+++ b/extensions/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -24,9 +24,8 @@
name: "Camel NSQ"
description: "Send and receive messages from NSQ realtime distributed messaging platform"
metadata:
- unlisted: true
guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/nsq.html"
categories:
- "integration"
status:
- - "preview"
+ - "stable"
diff --git a/extensions/pom.xml b/extensions/pom.xml
index a061f30..7f32cc9 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -153,6 +153,7 @@
<module>nats</module>
<module>netty</module>
<module>netty-http</module>
+ <module>nsq</module>
<module>olingo4</module>
<module>openapi-java</module>
<module>opentracing</module>
diff --git a/extensions-jvm/nsq/integration-test/pom.xml b/integration-tests/nsq/pom.xml
similarity index 59%
rename from extensions-jvm/nsq/integration-test/pom.xml
rename to integration-tests/nsq/pom.xml
index deb5d93..7441c31 100644
--- a/extensions-jvm/nsq/integration-test/pom.xml
+++ b/integration-tests/nsq/pom.xml
@@ -23,13 +23,12 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.camel.quarkus</groupId>
- <artifactId>camel-quarkus-build-parent-it</artifactId>
+ <artifactId>camel-quarkus-integration-tests</artifactId>
<version>1.2.0-SNAPSHOT</version>
- <relativePath>../../../poms/build-parent-it/pom.xml</relativePath>
</parent>
- <artifactId>camel-quarkus-nsq-integration-test</artifactId>
- <name>Camel Quarkus :: NSQ :: Integration Test</name>
+ <artifactId>camel-quarkus-integration-test-nsq</artifactId>
+ <name>Camel Quarkus :: Integration Tests :: NSQ</name>
<description>Integration tests for Camel Quarkus NSQ extension</description>
<dependencyManagement>
@@ -50,6 +49,10 @@
<artifactId>camel-quarkus-nsq</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct</artifactId>
+ </dependency>
+ <dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
@@ -65,6 +68,31 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-integration-testcontainers-support</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<dependency>
@@ -97,4 +125,34 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>native</id>
+ <activation>
+ <property>
+ <name>native</name>
+ </property>
+ </activation>
+ <properties>
+ <quarkus.package.type>native</quarkus.package.type>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqLogger.java
similarity index 59%
copy from extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
copy to integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqLogger.java
index 0d5c893..797d941 100644
--- a/extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
+++ b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqLogger.java
@@ -16,19 +16,24 @@
*/
package org.apache.camel.quarkus.component.nsq.it;
-import io.quarkus.test.junit.QuarkusTest;
-import io.restassured.RestAssured;
-import org.junit.jupiter.api.Test;
+import org.jboss.logging.Logger;
-@QuarkusTest
-class NsqTest {
+/**
+ * Quarkus is not able to display logs in JVM mode when running NSQ surefire tests.
+ * This class intent to workaround this issue.
+ */
+public class NsqLogger {
+
+ public static void log(Logger logger, String format, Object... args) {
+ String log = String.format(format, args);
+ System.out.println(log);
+ logger.info(log);
+ }
- @Test
- public void loadComponentNsq() {
- /* A simple autogenerated test */
- RestAssured.get("/nsq/load/component/nsq")
- .then()
- .statusCode(200);
+ public static void log(org.slf4j.Logger logger, String format, Object... args) {
+ String log = String.format(format, args);
+ System.out.println(log);
+ logger.info(log);
}
}
diff --git a/extensions-jvm/nsq/integration-test/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java
similarity index 55%
rename from extensions-jvm/nsq/integration-test/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java
rename to integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java
index 536bbc8..cbdd2cd 100644
--- a/extensions-jvm/nsq/integration-test/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java
+++ b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java
@@ -16,36 +16,53 @@
*/
package org.apache.camel.quarkus.component.nsq.it;
+import java.util.concurrent.ConcurrentHashMap;
+
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
+import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
+import javax.ws.rs.POST;
import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
import org.jboss.logging.Logger;
+import static org.apache.camel.quarkus.component.nsq.it.NsqLogger.log;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.MESSAGE_CHARSET;
+
@Path("/nsq")
@ApplicationScoped
public class NsqResource {
private static final Logger LOG = Logger.getLogger(NsqResource.class);
- private static final String COMPONENT_NSQ = "nsq";
+ private final ConcurrentHashMap<String, String> nsqMessages = new ConcurrentHashMap<>();
+
@Inject
- CamelContext context;
+ ProducerTemplate template;
+
+ @Path("/send")
+ @POST
+ @Consumes(MediaType.TEXT_PLAIN)
+ public void send(String msg) {
+ log(LOG, "Invoking send(%s)", msg);
+ template.sendBody("direct:send", msg.getBytes(MESSAGE_CHARSET));
+ }
+
+ void logNsqMessage(String test, String msg) {
+ log(LOG, "Calling logNsqMessage(%s,%s)", test, msg);
+ nsqMessages.put(test, msg);
+ }
- @Path("/load/component/nsq")
+ @Path("/get-messages/{test}")
@GET
@Produces(MediaType.TEXT_PLAIN)
- public Response loadComponentNsq() throws Exception {
- /* This is an autogenerated test */
- if (context.getComponent(COMPONENT_NSQ) != null) {
- return Response.ok().build();
- }
- LOG.warnf("Could not load [%s] from the Camel context", COMPONENT_NSQ);
- return Response.status(500, COMPONENT_NSQ + " could not be loaded from the Camel context").build();
+ public String getNsqMessages(@PathParam("test") String test) {
+ log(LOG, "Calling getNsqMessages(%s)", test);
+ return nsqMessages.get(test);
}
}
diff --git a/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqRoute.java b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqRoute.java
new file mode 100644
index 0000000..cd390f3
--- /dev/null
+++ b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqRoute.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.nsq.it;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.nsq.NsqConstants;
+import org.jboss.logging.Logger;
+
+import static org.apache.camel.quarkus.component.nsq.it.NsqLogger.log;
+
+@ApplicationScoped
+public class NsqRoute extends RouteBuilder {
+
+ private static final Logger LOG = Logger.getLogger(NsqRoute.class);
+
+ public static final String CONSUMER_HOST_CFG_KEY = "quarkus.camel.nsq.test.consumer-host";
+ public static final String CONSUMER_PORT_CFG_KEY = "quarkus.camel.nsq.test.consumer-port";
+
+ public static final String PRODUCER_HOST_CFG_KEY = "quarkus.camel.nsq.test.producer-host";
+ public static final String PRODUCER_PORT_CFG_KEY = "quarkus.camel.nsq.test.producer-port";
+
+ public static final String CONSUMER_TOPIC = "consumer-topic";
+ public static final String PRODUCER_TOPIC = "producer-topic";
+
+ public static final Charset MESSAGE_CHARSET = StandardCharsets.UTF_8;
+
+ @Inject
+ NsqResource resource;
+
+ @Override
+ public void configure() {
+
+ final String toUriFormat = "nsq://%s?servers={{%s}}:{{%s}}";
+ from("direct:send").toF(toUriFormat, PRODUCER_TOPIC, PRODUCER_HOST_CFG_KEY, PRODUCER_PORT_CFG_KEY);
+
+ final String fromUriFormat = "nsq://%s?servers={{%s}}:{{%s}}&lookupInterval=2000&autoFinish=false&requeueInterval=1000";
+ fromF(fromUriFormat, CONSUMER_TOPIC, CONSUMER_HOST_CFG_KEY, CONSUMER_PORT_CFG_KEY).process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ byte[] messageBytes = exchange.getIn().getBody(byte[].class);
+ String messageText = new String(messageBytes, MESSAGE_CHARSET);
+ int attempts = exchange.getIn().getHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS, Integer.class);
+
+ log(LOG, "Nsq consumer attempt %s to process \"%s\"", attempts, messageText);
+
+ if (messageText.contains("Requeue") && attempts < 3) {
+ throw new Exception("Forced error");
+ }
+ if (attempts >= 3) {
+ resource.logNsqMessage("testRequeue", messageText);
+ } else {
+ resource.logNsqMessage("testConsumer", messageText);
+ }
+ }
+ });
+ }
+
+}
diff --git a/extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqIT.java
similarity index 71%
rename from extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
rename to integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqIT.java
index 0d5c893..f62b36b 100644
--- a/extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
+++ b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqIT.java
@@ -16,19 +16,9 @@
*/
package org.apache.camel.quarkus.component.nsq.it;
-import io.quarkus.test.junit.QuarkusTest;
-import io.restassured.RestAssured;
-import org.junit.jupiter.api.Test;
+import io.quarkus.test.junit.NativeImageTest;
-@QuarkusTest
-class NsqTest {
-
- @Test
- public void loadComponentNsq() {
- /* A simple autogenerated test */
- RestAssured.get("/nsq/load/component/nsq")
- .then()
- .statusCode(200);
- }
+@NativeImageTest
+class NsqIT extends NsqTest {
}
diff --git a/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
new file mode 100644
index 0000000..6e6d4f2
--- /dev/null
+++ b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.nsq.it;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.github.brainlag.nsq.NSQConsumer;
+import com.github.brainlag.nsq.NSQProducer;
+import com.github.brainlag.nsq.exceptions.NSQException;
+import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
+import com.github.brainlag.nsq.lookup.NSQLookup;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.jboss.logging.Logger;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static io.restassured.RestAssured.given;
+import static org.apache.camel.quarkus.component.nsq.it.NsqLogger.log;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.CONSUMER_HOST_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.CONSUMER_PORT_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.CONSUMER_TOPIC;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.MESSAGE_CHARSET;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.PRODUCER_HOST_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.PRODUCER_PORT_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.PRODUCER_TOPIC;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@QuarkusTest
+@QuarkusTestResource(NsqTestResource.class)
+class NsqTest {
+
+ private static final Logger LOG = Logger.getLogger(NsqRoute.class);
+
+ private static final String TEST_CONSUMER_MSG = "Hello NSQConsumer !";
+ private static final String TEST_PRODUCER_MSG = "Hello NSQProducer !";
+ private static final String TEST_REQUEUE_MSG = "Test Requeue";
+
+ private static String CONSUMER_HOST, PRODUCER_HOST;
+ private static int CONDUMER_PORT, PRODUCER_PORT;
+
+ @BeforeAll
+ public static void setUp() {
+ CONSUMER_HOST = ConfigProvider.getConfig().getValue(CONSUMER_HOST_CFG_KEY, String.class);
+ CONDUMER_PORT = ConfigProvider.getConfig().getValue(CONSUMER_PORT_CFG_KEY, int.class);
+ PRODUCER_HOST = ConfigProvider.getConfig().getValue(PRODUCER_HOST_CFG_KEY, String.class);
+ PRODUCER_PORT = ConfigProvider.getConfig().getValue(PRODUCER_PORT_CFG_KEY, int.class);
+
+ log(LOG, "NsqTest.CONSUMER = %s:%s", CONSUMER_HOST, CONDUMER_PORT);
+ log(LOG, "NsqTest.PRODUCER = %s:%s", PRODUCER_HOST, PRODUCER_PORT);
+ }
+
+ @Test
+ void nsqProducerShouldSucceed() throws Exception {
+
+ CountDownLatch lock = new CountDownLatch(1);
+
+ given().body(TEST_PRODUCER_MSG).post("/nsq/send").then().statusCode(204);
+
+ AtomicInteger counter = new AtomicInteger(0);
+ NSQLookup lookup = new DefaultNSQLookup();
+ lookup.addLookupAddress(CONSUMER_HOST, CONDUMER_PORT);
+
+ try (NSQConsumer consumer = new NSQConsumer(lookup, PRODUCER_TOPIC, "testconsumer", message -> {
+ log(LOG, "The NSQConsumer from testProducer() received message %s", message);
+
+ counter.incrementAndGet();
+ message.finished();
+ lock.countDown();
+
+ assertEquals(TEST_PRODUCER_MSG, new String(message.getMessage(), MESSAGE_CHARSET));
+ })) {
+ consumer.start();
+
+ lock.await(10, TimeUnit.SECONDS);
+
+ assertEquals(1, counter.get());
+ }
+ }
+
+ @Test
+ void nsqConsumerShouldSucceed() throws NSQException, TimeoutException {
+ NSQProducer producer = new NSQProducer();
+ producer.addAddress(PRODUCER_HOST, PRODUCER_PORT);
+ producer.start();
+
+ producer.produce(CONSUMER_TOPIC, TEST_CONSUMER_MSG.getBytes(MESSAGE_CHARSET));
+
+ await().atMost(10L, TimeUnit.SECONDS).until(() -> {
+ return given().get("/nsq/get-messages/testConsumer").statusCode() == 200;
+ });
+ given().get("/nsq/get-messages/testConsumer").then().body(is(TEST_CONSUMER_MSG));
+ }
+
+ @Test
+ void nsqConsumerWithExceptionShouldRequeueMessagesThreeTimes() throws NSQException, TimeoutException {
+ NSQProducer producer = new NSQProducer();
+ producer.addAddress(PRODUCER_HOST, PRODUCER_PORT);
+ producer.start();
+
+ producer.produce(CONSUMER_TOPIC, TEST_REQUEUE_MSG.getBytes(MESSAGE_CHARSET));
+
+ await().atMost(10L, TimeUnit.SECONDS).until(() -> {
+ return given().get("/nsq/get-messages/testRequeue").statusCode() == 200;
+ });
+ given().get("/nsq/get-messages/testRequeue").then().body(is(TEST_REQUEUE_MSG));
+ }
+
+}
diff --git a/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTestResource.java b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTestResource.java
new file mode 100644
index 0000000..764b3ac
--- /dev/null
+++ b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTestResource.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.quarkus.component.nsq.it;
+
+import java.util.Map;
+
+import org.apache.camel.quarkus.testcontainers.ContainerResourceLifecycleManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.TestcontainersConfiguration;
+
+import static org.apache.camel.quarkus.component.nsq.it.NsqLogger.log;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.CONSUMER_HOST_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.CONSUMER_PORT_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.PRODUCER_HOST_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.PRODUCER_PORT_CFG_KEY;
+import static org.apache.camel.util.CollectionHelper.mapOf;
+
+public class NsqTestResource implements ContainerResourceLifecycleManager {
+ private static final Logger LOG = LoggerFactory.getLogger(NsqTestResource.class);
+
+ public static final String CONTAINER_NSQ_IMAGE = "nsqio/nsq:v1.2.0";
+
+ public static final String CONTAINER_NSQLOOKUPD_NAME = "nsqlookupd";
+ public static final int CONTAINER_NSQLOOKUPD_TCP_PORT = 4160;
+ public static final int CONTAINER_NSQLOOKUPD_HTTP_PORT = 4161;
+
+ public static final String CONTAINER_NSQD_NAME = "nsqd";
+ public static final int CONTAINER_NSQD_TCP_PORT = 4150;
+
+ private GenericContainer nsqDaemonContainer, nsqLookupDaemonContainer;
+
+ @Override
+ public Map<String, String> start() {
+ log(LOG, "%s", TestcontainersConfiguration.getInstance().toString());
+
+ Network network = Network.newNetwork();
+
+ nsqLookupDaemonContainer = new FixedHostPortGenericContainer(CONTAINER_NSQ_IMAGE)
+ .withFixedExposedPort(CONTAINER_NSQLOOKUPD_HTTP_PORT, CONTAINER_NSQLOOKUPD_HTTP_PORT)
+ .withNetworkAliases(CONTAINER_NSQLOOKUPD_NAME)
+ .withCommand("/nsqlookupd")
+ .withNetwork(network)
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ .waitingFor(Wait.forLogMessage(".*TCP: listening on.*", 1));
+ nsqLookupDaemonContainer.start();
+
+ String nsqdCmdFormat = "/nsqd --broadcast-address=%s --lookupd-tcp-address=%s:%s";
+ String nsqdCmd = String.format(nsqdCmdFormat, "localhost", CONTAINER_NSQLOOKUPD_NAME, CONTAINER_NSQLOOKUPD_TCP_PORT);
+ nsqDaemonContainer = new FixedHostPortGenericContainer(CONTAINER_NSQ_IMAGE)
+ .withFixedExposedPort(CONTAINER_NSQD_TCP_PORT, CONTAINER_NSQD_TCP_PORT)
+ .withNetworkAliases(CONTAINER_NSQD_NAME)
+ .withCommand(nsqdCmd)
+ .withNetwork(network)
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ .waitingFor(Wait.forLogMessage(".*TCP: listening on.*", 1));
+ nsqDaemonContainer.start();
+
+ String nsqConsumerHost = nsqLookupDaemonContainer.getContainerIpAddress();
+ Integer nsqConsumerPort = nsqLookupDaemonContainer.getMappedPort(CONTAINER_NSQLOOKUPD_HTTP_PORT);
+
+ String nsqProducerHost = nsqDaemonContainer.getContainerIpAddress();
+ Integer nsqProducerPort = nsqDaemonContainer.getMappedPort(CONTAINER_NSQD_TCP_PORT);
+
+ return mapOf(CONSUMER_HOST_CFG_KEY, nsqConsumerHost, CONSUMER_PORT_CFG_KEY, "" + nsqConsumerPort,
+ PRODUCER_HOST_CFG_KEY, nsqProducerHost, PRODUCER_PORT_CFG_KEY, "" + nsqProducerPort);
+ }
+
+ @Override
+ public void stop() {
+ log(LOG, "Logs for nsqLookupContainer: %s", nsqLookupDaemonContainer.getLogs());
+ log(LOG, "Logs for nsqContainer: %s", nsqDaemonContainer.getLogs());
+
+ try {
+ if (nsqLookupDaemonContainer != null) {
+ nsqLookupDaemonContainer.stop();
+ }
+ } catch (Exception ex) {
+ log(LOG, "An issue occured while stopping nsqLookupContainer %s:", ex.getMessage());
+ }
+
+ try {
+ if (nsqDaemonContainer != null) {
+ nsqDaemonContainer.stop();
+ }
+ } catch (Exception ex) {
+ log(LOG, "An issue occured while stopping nsqContainer %s:", ex.getMessage());
+ }
+ }
+}
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index a51902f..b135a0c 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -127,6 +127,7 @@
<module>mustache</module>
<module>nats</module>
<module>netty</module>
+ <module>nsq</module>
<module>olingo4</module>
<module>openapi-java</module>
<module>opentracing</module>
diff --git a/tooling/scripts/test-categories.yaml b/tooling/scripts/test-categories.yaml
index 774f1e3..f171e5e 100644
--- a/tooling/scripts/test-categories.yaml
+++ b/tooling/scripts/test-categories.yaml
@@ -116,6 +116,7 @@ networking2-dataformats:
- git
- mail
- netty
+ - nsq
- send-dynamic-http
- servlet
- univocity-parsers