You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by al...@apache.org on 2020/10/14 18:51:21 UTC

[camel-quarkus] branch master updated: Added nsq native support fixes #1722

This is an automated email from the ASF dual-hosted git repository.

aldettinger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/master by this push:
     new dd6c3e7  Added nsq native support fixes #1722
dd6c3e7 is described below

commit dd6c3e7d4d986263e16fd1aef80dceb78287ed59
Author: aldettinger <al...@gmail.com>
AuthorDate: Thu Oct 8 20:01:04 2020 +0200

    Added nsq native support fixes #1722
---
 .../ROOT/pages/reference/extensions/nsq.adoc       |   8 +-
 .../ROOT/partials/reference/components/nsq.adoc    |   6 +-
 extensions-jvm/pom.xml                             |   1 -
 .../nsq/deployment/pom.xml                         |   0
 .../component/nsq/deployment/NsqProcessor.java     |  22 ++--
 {extensions-jvm => extensions}/nsq/pom.xml         |   1 -
 {extensions-jvm => extensions}/nsq/runtime/pom.xml |  11 ++
 .../main/resources/META-INF/quarkus-extension.yaml |   3 +-
 extensions/pom.xml                                 |   1 +
 .../nsq}/pom.xml                                   |  66 ++++++++++-
 .../camel/quarkus/component/nsq/it/NsqLogger.java  |  27 +++--
 .../quarkus/component/nsq/it/NsqResource.java      |  41 +++++--
 .../camel/quarkus/component/nsq/it/NsqRoute.java   |  80 +++++++++++++
 .../camel/quarkus/component/nsq/it/NsqIT.java      |  16 +--
 .../camel/quarkus/component/nsq/it/NsqTest.java    | 129 +++++++++++++++++++++
 .../quarkus/component/nsq/it/NsqTestResource.java  | 110 ++++++++++++++++++
 integration-tests/pom.xml                          |   1 +
 tooling/scripts/test-categories.yaml               |   1 +
 18 files changed, 459 insertions(+), 65 deletions(-)

diff --git a/docs/modules/ROOT/pages/reference/extensions/nsq.adoc b/docs/modules/ROOT/pages/reference/extensions/nsq.adoc
index dcfd4a0..407e8df 100644
--- a/docs/modules/ROOT/pages/reference/extensions/nsq.adoc
+++ b/docs/modules/ROOT/pages/reference/extensions/nsq.adoc
@@ -2,15 +2,15 @@
 // This file was generated by camel-quarkus-maven-plugin:update-extension-doc-page
 = NSQ
 :cq-artifact-id: camel-quarkus-nsq
-:cq-native-supported: false
-:cq-status: Preview
+:cq-native-supported: true
+:cq-status: Stable
 :cq-description: Send and receive messages from NSQ realtime distributed messaging platform.
 :cq-deprecated: false
 :cq-jvm-since: 1.1.0
-:cq-native-since: n/a
+:cq-native-since: 1.2.0
 
 [.badges]
-[.badge-key]##JVM since##[.badge-supported]##1.1.0## [.badge-key]##Native##[.badge-unsupported]##unsupported##
+[.badge-key]##JVM since##[.badge-supported]##1.1.0## [.badge-key]##Native since##[.badge-supported]##1.2.0##
 
 Send and receive messages from NSQ realtime distributed messaging platform.
 
diff --git a/docs/modules/ROOT/partials/reference/components/nsq.adoc b/docs/modules/ROOT/partials/reference/components/nsq.adoc
index 34e93e4..50cffb6 100644
--- a/docs/modules/ROOT/partials/reference/components/nsq.adoc
+++ b/docs/modules/ROOT/partials/reference/components/nsq.adoc
@@ -2,11 +2,11 @@
 // This file was generated by camel-quarkus-maven-plugin:update-extension-doc-page
 :cq-artifact-id: camel-quarkus-nsq
 :cq-artifact-id-base: nsq
-:cq-native-supported: false
-:cq-status: Preview
+:cq-native-supported: true
+:cq-status: Stable
 :cq-deprecated: false
 :cq-jvm-since: 1.1.0
-:cq-native-since: n/a
+:cq-native-since: 1.2.0
 :cq-camel-part-name: nsq
 :cq-camel-part-title: NSQ
 :cq-camel-part-description: Send and receive messages from NSQ realtime distributed messaging platform.
diff --git a/extensions-jvm/pom.xml b/extensions-jvm/pom.xml
index 1ca96e3..8e13181 100644
--- a/extensions-jvm/pom.xml
+++ b/extensions-jvm/pom.xml
@@ -112,7 +112,6 @@
         <module>mybatis</module>
         <module>nagios</module>
         <module>nitrite</module>
-        <module>nsq</module>
         <module>ognl</module>
         <module>openstack</module>
         <module>optaplanner</module>
diff --git a/extensions-jvm/nsq/deployment/pom.xml b/extensions/nsq/deployment/pom.xml
similarity index 100%
rename from extensions-jvm/nsq/deployment/pom.xml
rename to extensions/nsq/deployment/pom.xml
diff --git a/extensions-jvm/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java b/extensions/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java
similarity index 62%
rename from extensions-jvm/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java
rename to extensions/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java
index 6ca3923..222cf80 100644
--- a/extensions-jvm/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java
+++ b/extensions/nsq/deployment/src/main/java/org/apache/camel/quarkus/component/nsq/deployment/NsqProcessor.java
@@ -16,17 +16,13 @@
  */
 package org.apache.camel.quarkus.component.nsq.deployment;
 
+import io.quarkus.deployment.annotations.BuildProducer;
 import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.annotations.ExecutionTime;
-import io.quarkus.deployment.annotations.Record;
 import io.quarkus.deployment.builditem.FeatureBuildItem;
-import io.quarkus.deployment.pkg.steps.NativeBuild;
-import org.apache.camel.quarkus.core.JvmOnlyRecorder;
-import org.jboss.logging.Logger;
+import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
 
 class NsqProcessor {
 
-    private static final Logger LOG = Logger.getLogger(NsqProcessor.class);
     private static final String FEATURE = "camel-nsq";
 
     @BuildStep
@@ -34,13 +30,11 @@ class NsqProcessor {
         return new FeatureBuildItem(FEATURE);
     }
 
-    /**
-     * Remove this once this extension starts supporting the native mode.
-     */
-    @BuildStep(onlyIf = NativeBuild.class)
-    @Record(value = ExecutionTime.RUNTIME_INIT)
-    void warnJvmInNative(JvmOnlyRecorder recorder) {
-        JvmOnlyRecorder.warnJvmInNative(LOG, FEATURE); // warn at build time
-        recorder.warnJvmInNative(FEATURE); // warn at runtime
+    @BuildStep
+    void registerReflectiveClasses(BuildProducer<ReflectiveClassBuildItem> producer) {
+        producer.produce(new ReflectiveClassBuildItem(false, false, "org.apache.commons.pool2.impl.DefaultEvictionPolicy"));
+        producer.produce(new ReflectiveClassBuildItem(false, false, "org.apache.logging.log4j.message.ReusableMessageFactory"));
+        producer.produce(
+                new ReflectiveClassBuildItem(false, false, "org.apache.logging.log4j.message.DefaultFlowMessageFactory"));
     }
 }
diff --git a/extensions-jvm/nsq/pom.xml b/extensions/nsq/pom.xml
similarity index 97%
rename from extensions-jvm/nsq/pom.xml
rename to extensions/nsq/pom.xml
index 6ad3b96..bedf167 100644
--- a/extensions-jvm/nsq/pom.xml
+++ b/extensions/nsq/pom.xml
@@ -35,6 +35,5 @@
     <modules>
         <module>deployment</module>
         <module>runtime</module>
-        <module>integration-test</module>
     </modules>
 </project>
diff --git a/extensions-jvm/nsq/runtime/pom.xml b/extensions/nsq/runtime/pom.xml
similarity index 89%
rename from extensions-jvm/nsq/runtime/pom.xml
rename to extensions/nsq/runtime/pom.xml
index 260f59f..1fd7046 100644
--- a/extensions-jvm/nsq/runtime/pom.xml
+++ b/extensions/nsq/runtime/pom.xml
@@ -34,6 +34,7 @@
 
     <properties>
         <camel.quarkus.jvmSince>1.1.0</camel.quarkus.jvmSince>
+        <camel.quarkus.nativeSince>1.2.0</camel.quarkus.nativeSince>
     </properties>
 
     <dependencyManagement>
@@ -54,8 +55,18 @@
             <artifactId>camel-quarkus-core</artifactId>
         </dependency>
         <dependency>
+          <groupId>org.jboss.logmanager</groupId>
+          <artifactId>log4j2-jboss-logmanager</artifactId> 
+        </dependency>
+        <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-nsq</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-api</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 
diff --git a/extensions-jvm/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml
similarity index 97%
rename from extensions-jvm/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml
rename to extensions/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml
index 85ade51..e9c0a64 100644
--- a/extensions-jvm/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml
+++ b/extensions/nsq/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -24,9 +24,8 @@
 name: "Camel NSQ"
 description: "Send and receive messages from NSQ realtime distributed messaging platform"
 metadata:
-  unlisted: true
   guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/nsq.html"
   categories:
   - "integration"
   status:
-  - "preview"
+  - "stable"
diff --git a/extensions/pom.xml b/extensions/pom.xml
index a061f30..7f32cc9 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -153,6 +153,7 @@
         <module>nats</module>
         <module>netty</module>
         <module>netty-http</module>
+        <module>nsq</module>
         <module>olingo4</module>
         <module>openapi-java</module>
         <module>opentracing</module>
diff --git a/extensions-jvm/nsq/integration-test/pom.xml b/integration-tests/nsq/pom.xml
similarity index 59%
rename from extensions-jvm/nsq/integration-test/pom.xml
rename to integration-tests/nsq/pom.xml
index deb5d93..7441c31 100644
--- a/extensions-jvm/nsq/integration-test/pom.xml
+++ b/integration-tests/nsq/pom.xml
@@ -23,13 +23,12 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.camel.quarkus</groupId>
-        <artifactId>camel-quarkus-build-parent-it</artifactId>
+        <artifactId>camel-quarkus-integration-tests</artifactId>
         <version>1.2.0-SNAPSHOT</version>
-        <relativePath>../../../poms/build-parent-it/pom.xml</relativePath>
     </parent>
 
-    <artifactId>camel-quarkus-nsq-integration-test</artifactId>
-    <name>Camel Quarkus :: NSQ :: Integration Test</name>
+    <artifactId>camel-quarkus-integration-test-nsq</artifactId>
+    <name>Camel Quarkus :: Integration Tests :: NSQ</name>
     <description>Integration tests for Camel Quarkus NSQ extension</description>
 
     <dependencyManagement>
@@ -50,6 +49,10 @@
             <artifactId>camel-quarkus-nsq</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-direct</artifactId>
+        </dependency>
+        <dependency>
             <groupId>io.quarkus</groupId>
             <artifactId>quarkus-resteasy</artifactId>
         </dependency>
@@ -65,6 +68,31 @@
             <artifactId>rest-assured</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-integration-testcontainers-support</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-direct-deployment</artifactId>
+            <version>${project.version}</version>
+            <type>pom</type>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
         <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
         <dependency>
@@ -97,4 +125,34 @@
             </plugin>
         </plugins>
     </build>
+
+    <profiles>
+        <profile>
+            <id>native</id>
+            <activation>
+                <property>
+                    <name>native</name>
+                </property>
+            </activation>
+            <properties>
+                <quarkus.package.type>native</quarkus.package.type>
+            </properties>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-failsafe-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>integration-test</goal>
+                                    <goal>verify</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>
diff --git a/extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqLogger.java
similarity index 59%
copy from extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
copy to integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqLogger.java
index 0d5c893..797d941 100644
--- a/extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
+++ b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqLogger.java
@@ -16,19 +16,24 @@
  */
 package org.apache.camel.quarkus.component.nsq.it;
 
-import io.quarkus.test.junit.QuarkusTest;
-import io.restassured.RestAssured;
-import org.junit.jupiter.api.Test;
+import org.jboss.logging.Logger;
 
-@QuarkusTest
-class NsqTest {
+/**
+ * Quarkus is not able to display logs in JVM mode when running NSQ surefire tests.
+ * This class intent to workaround this issue.
+ */
+public class NsqLogger {
+
+    public static void log(Logger logger, String format, Object... args) {
+        String log = String.format(format, args);
+        System.out.println(log);
+        logger.info(log);
+    }
 
-    @Test
-    public void loadComponentNsq() {
-        /* A simple autogenerated test */
-        RestAssured.get("/nsq/load/component/nsq")
-                .then()
-                .statusCode(200);
+    public static void log(org.slf4j.Logger logger, String format, Object... args) {
+        String log = String.format(format, args);
+        System.out.println(log);
+        logger.info(log);
     }
 
 }
diff --git a/extensions-jvm/nsq/integration-test/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java
similarity index 55%
rename from extensions-jvm/nsq/integration-test/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java
rename to integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java
index 536bbc8..cbdd2cd 100644
--- a/extensions-jvm/nsq/integration-test/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java
+++ b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqResource.java
@@ -16,36 +16,53 @@
  */
 package org.apache.camel.quarkus.component.nsq.it;
 
+import java.util.concurrent.ConcurrentHashMap;
+
 import javax.enterprise.context.ApplicationScoped;
 import javax.inject.Inject;
+import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 
-import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
 import org.jboss.logging.Logger;
 
+import static org.apache.camel.quarkus.component.nsq.it.NsqLogger.log;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.MESSAGE_CHARSET;
+
 @Path("/nsq")
 @ApplicationScoped
 public class NsqResource {
 
     private static final Logger LOG = Logger.getLogger(NsqResource.class);
 
-    private static final String COMPONENT_NSQ = "nsq";
+    private final ConcurrentHashMap<String, String> nsqMessages = new ConcurrentHashMap<>();
+
     @Inject
-    CamelContext context;
+    ProducerTemplate template;
+
+    @Path("/send")
+    @POST
+    @Consumes(MediaType.TEXT_PLAIN)
+    public void send(String msg) {
+        log(LOG, "Invoking send(%s)", msg);
+        template.sendBody("direct:send", msg.getBytes(MESSAGE_CHARSET));
+    }
+
+    void logNsqMessage(String test, String msg) {
+        log(LOG, "Calling logNsqMessage(%s,%s)", test, msg);
+        nsqMessages.put(test, msg);
+    }
 
-    @Path("/load/component/nsq")
+    @Path("/get-messages/{test}")
     @GET
     @Produces(MediaType.TEXT_PLAIN)
-    public Response loadComponentNsq() throws Exception {
-        /* This is an autogenerated test */
-        if (context.getComponent(COMPONENT_NSQ) != null) {
-            return Response.ok().build();
-        }
-        LOG.warnf("Could not load [%s] from the Camel context", COMPONENT_NSQ);
-        return Response.status(500, COMPONENT_NSQ + " could not be loaded from the Camel context").build();
+    public String getNsqMessages(@PathParam("test") String test) {
+        log(LOG, "Calling getNsqMessages(%s)", test);
+        return nsqMessages.get(test);
     }
 }
diff --git a/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqRoute.java b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqRoute.java
new file mode 100644
index 0000000..cd390f3
--- /dev/null
+++ b/integration-tests/nsq/src/main/java/org/apache/camel/quarkus/component/nsq/it/NsqRoute.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.nsq.it;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.nsq.NsqConstants;
+import org.jboss.logging.Logger;
+
+import static org.apache.camel.quarkus.component.nsq.it.NsqLogger.log;
+
+@ApplicationScoped
+public class NsqRoute extends RouteBuilder {
+
+    private static final Logger LOG = Logger.getLogger(NsqRoute.class);
+
+    public static final String CONSUMER_HOST_CFG_KEY = "quarkus.camel.nsq.test.consumer-host";
+    public static final String CONSUMER_PORT_CFG_KEY = "quarkus.camel.nsq.test.consumer-port";
+
+    public static final String PRODUCER_HOST_CFG_KEY = "quarkus.camel.nsq.test.producer-host";
+    public static final String PRODUCER_PORT_CFG_KEY = "quarkus.camel.nsq.test.producer-port";
+
+    public static final String CONSUMER_TOPIC = "consumer-topic";
+    public static final String PRODUCER_TOPIC = "producer-topic";
+
+    public static final Charset MESSAGE_CHARSET = StandardCharsets.UTF_8;
+
+    @Inject
+    NsqResource resource;
+
+    @Override
+    public void configure() {
+
+        final String toUriFormat = "nsq://%s?servers={{%s}}:{{%s}}";
+        from("direct:send").toF(toUriFormat, PRODUCER_TOPIC, PRODUCER_HOST_CFG_KEY, PRODUCER_PORT_CFG_KEY);
+
+        final String fromUriFormat = "nsq://%s?servers={{%s}}:{{%s}}&lookupInterval=2000&autoFinish=false&requeueInterval=1000";
+        fromF(fromUriFormat, CONSUMER_TOPIC, CONSUMER_HOST_CFG_KEY, CONSUMER_PORT_CFG_KEY).process(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                byte[] messageBytes = exchange.getIn().getBody(byte[].class);
+                String messageText = new String(messageBytes, MESSAGE_CHARSET);
+                int attempts = exchange.getIn().getHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS, Integer.class);
+
+                log(LOG, "Nsq consumer attempt %s to process \"%s\"", attempts, messageText);
+
+                if (messageText.contains("Requeue") && attempts < 3) {
+                    throw new Exception("Forced error");
+                }
+                if (attempts >= 3) {
+                    resource.logNsqMessage("testRequeue", messageText);
+                } else {
+                    resource.logNsqMessage("testConsumer", messageText);
+                }
+            }
+        });
+    }
+
+}
diff --git a/extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqIT.java
similarity index 71%
rename from extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
rename to integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqIT.java
index 0d5c893..f62b36b 100644
--- a/extensions-jvm/nsq/integration-test/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
+++ b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqIT.java
@@ -16,19 +16,9 @@
  */
 package org.apache.camel.quarkus.component.nsq.it;
 
-import io.quarkus.test.junit.QuarkusTest;
-import io.restassured.RestAssured;
-import org.junit.jupiter.api.Test;
+import io.quarkus.test.junit.NativeImageTest;
 
-@QuarkusTest
-class NsqTest {
-
-    @Test
-    public void loadComponentNsq() {
-        /* A simple autogenerated test */
-        RestAssured.get("/nsq/load/component/nsq")
-                .then()
-                .statusCode(200);
-    }
+@NativeImageTest
+class NsqIT extends NsqTest {
 
 }
diff --git a/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
new file mode 100644
index 0000000..6e6d4f2
--- /dev/null
+++ b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.nsq.it;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.github.brainlag.nsq.NSQConsumer;
+import com.github.brainlag.nsq.NSQProducer;
+import com.github.brainlag.nsq.exceptions.NSQException;
+import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
+import com.github.brainlag.nsq.lookup.NSQLookup;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.jboss.logging.Logger;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static io.restassured.RestAssured.given;
+import static org.apache.camel.quarkus.component.nsq.it.NsqLogger.log;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.CONSUMER_HOST_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.CONSUMER_PORT_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.CONSUMER_TOPIC;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.MESSAGE_CHARSET;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.PRODUCER_HOST_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.PRODUCER_PORT_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.PRODUCER_TOPIC;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@QuarkusTest
+@QuarkusTestResource(NsqTestResource.class)
+class NsqTest {
+
+    private static final Logger LOG = Logger.getLogger(NsqRoute.class);
+
+    private static final String TEST_CONSUMER_MSG = "Hello NSQConsumer !";
+    private static final String TEST_PRODUCER_MSG = "Hello NSQProducer !";
+    private static final String TEST_REQUEUE_MSG = "Test Requeue";
+
+    private static String CONSUMER_HOST, PRODUCER_HOST;
+    private static int CONDUMER_PORT, PRODUCER_PORT;
+
+    @BeforeAll
+    public static void setUp() {
+        CONSUMER_HOST = ConfigProvider.getConfig().getValue(CONSUMER_HOST_CFG_KEY, String.class);
+        CONDUMER_PORT = ConfigProvider.getConfig().getValue(CONSUMER_PORT_CFG_KEY, int.class);
+        PRODUCER_HOST = ConfigProvider.getConfig().getValue(PRODUCER_HOST_CFG_KEY, String.class);
+        PRODUCER_PORT = ConfigProvider.getConfig().getValue(PRODUCER_PORT_CFG_KEY, int.class);
+
+        log(LOG, "NsqTest.CONSUMER = %s:%s", CONSUMER_HOST, CONDUMER_PORT);
+        log(LOG, "NsqTest.PRODUCER = %s:%s", PRODUCER_HOST, PRODUCER_PORT);
+    }
+
+    @Test
+    void nsqProducerShouldSucceed() throws Exception {
+
+        CountDownLatch lock = new CountDownLatch(1);
+
+        given().body(TEST_PRODUCER_MSG).post("/nsq/send").then().statusCode(204);
+
+        AtomicInteger counter = new AtomicInteger(0);
+        NSQLookup lookup = new DefaultNSQLookup();
+        lookup.addLookupAddress(CONSUMER_HOST, CONDUMER_PORT);
+
+        try (NSQConsumer consumer = new NSQConsumer(lookup, PRODUCER_TOPIC, "testconsumer", message -> {
+            log(LOG, "The NSQConsumer from testProducer() received message %s", message);
+
+            counter.incrementAndGet();
+            message.finished();
+            lock.countDown();
+
+            assertEquals(TEST_PRODUCER_MSG, new String(message.getMessage(), MESSAGE_CHARSET));
+        })) {
+            consumer.start();
+
+            lock.await(10, TimeUnit.SECONDS);
+
+            assertEquals(1, counter.get());
+        }
+    }
+
+    @Test
+    void nsqConsumerShouldSucceed() throws NSQException, TimeoutException {
+        NSQProducer producer = new NSQProducer();
+        producer.addAddress(PRODUCER_HOST, PRODUCER_PORT);
+        producer.start();
+
+        producer.produce(CONSUMER_TOPIC, TEST_CONSUMER_MSG.getBytes(MESSAGE_CHARSET));
+
+        await().atMost(10L, TimeUnit.SECONDS).until(() -> {
+            return given().get("/nsq/get-messages/testConsumer").statusCode() == 200;
+        });
+        given().get("/nsq/get-messages/testConsumer").then().body(is(TEST_CONSUMER_MSG));
+    }
+
+    @Test
+    void nsqConsumerWithExceptionShouldRequeueMessagesThreeTimes() throws NSQException, TimeoutException {
+        NSQProducer producer = new NSQProducer();
+        producer.addAddress(PRODUCER_HOST, PRODUCER_PORT);
+        producer.start();
+
+        producer.produce(CONSUMER_TOPIC, TEST_REQUEUE_MSG.getBytes(MESSAGE_CHARSET));
+
+        await().atMost(10L, TimeUnit.SECONDS).until(() -> {
+            return given().get("/nsq/get-messages/testRequeue").statusCode() == 200;
+        });
+        given().get("/nsq/get-messages/testRequeue").then().body(is(TEST_REQUEUE_MSG));
+    }
+
+}
diff --git a/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTestResource.java b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTestResource.java
new file mode 100644
index 0000000..764b3ac
--- /dev/null
+++ b/integration-tests/nsq/src/test/java/org/apache/camel/quarkus/component/nsq/it/NsqTestResource.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.quarkus.component.nsq.it;
+
+import java.util.Map;
+
+import org.apache.camel.quarkus.testcontainers.ContainerResourceLifecycleManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.TestcontainersConfiguration;
+
+import static org.apache.camel.quarkus.component.nsq.it.NsqLogger.log;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.CONSUMER_HOST_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.CONSUMER_PORT_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.PRODUCER_HOST_CFG_KEY;
+import static org.apache.camel.quarkus.component.nsq.it.NsqRoute.PRODUCER_PORT_CFG_KEY;
+import static org.apache.camel.util.CollectionHelper.mapOf;
+
+public class NsqTestResource implements ContainerResourceLifecycleManager {
+    private static final Logger LOG = LoggerFactory.getLogger(NsqTestResource.class);
+
+    public static final String CONTAINER_NSQ_IMAGE = "nsqio/nsq:v1.2.0";
+
+    public static final String CONTAINER_NSQLOOKUPD_NAME = "nsqlookupd";
+    public static final int CONTAINER_NSQLOOKUPD_TCP_PORT = 4160;
+    public static final int CONTAINER_NSQLOOKUPD_HTTP_PORT = 4161;
+
+    public static final String CONTAINER_NSQD_NAME = "nsqd";
+    public static final int CONTAINER_NSQD_TCP_PORT = 4150;
+
+    private GenericContainer nsqDaemonContainer, nsqLookupDaemonContainer;
+
+    @Override
+    public Map<String, String> start() {
+        log(LOG, "%s", TestcontainersConfiguration.getInstance().toString());
+
+        Network network = Network.newNetwork();
+
+        nsqLookupDaemonContainer = new FixedHostPortGenericContainer(CONTAINER_NSQ_IMAGE)
+                .withFixedExposedPort(CONTAINER_NSQLOOKUPD_HTTP_PORT, CONTAINER_NSQLOOKUPD_HTTP_PORT)
+                .withNetworkAliases(CONTAINER_NSQLOOKUPD_NAME)
+                .withCommand("/nsqlookupd")
+                .withNetwork(network)
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                .waitingFor(Wait.forLogMessage(".*TCP: listening on.*", 1));
+        nsqLookupDaemonContainer.start();
+
+        String nsqdCmdFormat = "/nsqd --broadcast-address=%s --lookupd-tcp-address=%s:%s";
+        String nsqdCmd = String.format(nsqdCmdFormat, "localhost", CONTAINER_NSQLOOKUPD_NAME, CONTAINER_NSQLOOKUPD_TCP_PORT);
+        nsqDaemonContainer = new FixedHostPortGenericContainer(CONTAINER_NSQ_IMAGE)
+                .withFixedExposedPort(CONTAINER_NSQD_TCP_PORT, CONTAINER_NSQD_TCP_PORT)
+                .withNetworkAliases(CONTAINER_NSQD_NAME)
+                .withCommand(nsqdCmd)
+                .withNetwork(network)
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                .waitingFor(Wait.forLogMessage(".*TCP: listening on.*", 1));
+        nsqDaemonContainer.start();
+
+        String nsqConsumerHost = nsqLookupDaemonContainer.getContainerIpAddress();
+        Integer nsqConsumerPort = nsqLookupDaemonContainer.getMappedPort(CONTAINER_NSQLOOKUPD_HTTP_PORT);
+
+        String nsqProducerHost = nsqDaemonContainer.getContainerIpAddress();
+        Integer nsqProducerPort = nsqDaemonContainer.getMappedPort(CONTAINER_NSQD_TCP_PORT);
+
+        return mapOf(CONSUMER_HOST_CFG_KEY, nsqConsumerHost, CONSUMER_PORT_CFG_KEY, "" + nsqConsumerPort,
+                PRODUCER_HOST_CFG_KEY, nsqProducerHost, PRODUCER_PORT_CFG_KEY, "" + nsqProducerPort);
+    }
+
+    @Override
+    public void stop() {
+        log(LOG, "Logs for nsqLookupContainer: %s", nsqLookupDaemonContainer.getLogs());
+        log(LOG, "Logs for nsqContainer: %s", nsqDaemonContainer.getLogs());
+
+        try {
+            if (nsqLookupDaemonContainer != null) {
+                nsqLookupDaemonContainer.stop();
+            }
+        } catch (Exception ex) {
+            log(LOG, "An issue occured while stopping nsqLookupContainer %s:", ex.getMessage());
+        }
+
+        try {
+            if (nsqDaemonContainer != null) {
+                nsqDaemonContainer.stop();
+            }
+        } catch (Exception ex) {
+            log(LOG, "An issue occured while stopping nsqContainer %s:", ex.getMessage());
+        }
+    }
+}
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index a51902f..b135a0c 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -127,6 +127,7 @@
         <module>mustache</module>
         <module>nats</module>
         <module>netty</module>
+        <module>nsq</module>
         <module>olingo4</module>
         <module>openapi-java</module>
         <module>opentracing</module>
diff --git a/tooling/scripts/test-categories.yaml b/tooling/scripts/test-categories.yaml
index 774f1e3..f171e5e 100644
--- a/tooling/scripts/test-categories.yaml
+++ b/tooling/scripts/test-categories.yaml
@@ -116,6 +116,7 @@ networking2-dataformats:
   - git
   - mail
   - netty
+  - nsq
   - send-dynamic-http
   - servlet
   - univocity-parsers