You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/11/08 08:39:46 UTC

[camel] 04/11: camel-nsq component can automatically Finish messages.

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ccddc0762de135c3d638723b2149848c937a13e6
Author: mionker <mi...@icloud.com>
AuthorDate: Sun Nov 4 17:36:10 2018 +0100

    camel-nsq component can automatically Finish messages.
---
 .../camel/component/nsq/NsqConfiguration.java      | 26 ++++++++++++
 .../apache/camel/component/nsq/NsqConsumer.java    | 49 +++++++++++++++++-----
 .../apache/camel/component/nsq/NsqEndpoint.java    | 10 ++++-
 .../camel/component/nsq/NsqSynchronization.java    | 26 ++++++++++++
 .../camel/component/nsq/NsqConsumerTest.java       | 34 +++++++++++++--
 5 files changed, 130 insertions(+), 15 deletions(-)

diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java
index 2c01f99..9cdd5c1 100644
--- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java
@@ -36,6 +36,10 @@ public class NsqConfiguration {
     private long requeueInterval = 0;
     @UriParam(label = "security")
     private SSLContextParameters sslContextParameters;
+    @UriParam(label = "consumer", defaultValue = "true", description = "Automatically finish the NSQ Message when it is retrievd from the queue and before the Exchange is processed.")
+    private Boolean autoFinish = true;
+    @UriParam(label = "consumer", description = "The NSQ consumer timeout period for messages retrieved from the queue.")
+    private long messageTimeout;
 
     /*
      * URL a NSQ lookup server hostname.
@@ -161,6 +165,28 @@ public class NsqConfiguration {
         this.sslContextParameters = sslContextParameters;
     }
 
+    /**
+     * Automatically finish the NSQ message when it is retrieved from the quese and before the Exchange is processed.
+     */
+    public Boolean getAutoFinish() {
+        return autoFinish;
+    }
+
+    public void setAutoFinish(Boolean autoFinish) {
+        this.autoFinish = autoFinish;
+    }
+
+    /**
+     * The NSQ message timeout for a consumer.
+     */
+    public long getMessageTimeout() {
+        return messageTimeout;
+    }
+
+    public void setMessageTimeout(long messageTimeout) {
+        this.messageTimeout = messageTimeout;
+    }
+
     private String splitServers() {
         StringBuilder servers = new StringBuilder();
 
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
index 6402fe5..68faf37 100644
--- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
@@ -7,8 +7,10 @@ import com.github.brainlag.nsq.callbacks.NSQMessageCallback;
 import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
 import com.github.brainlag.nsq.lookup.NSQLookup;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.spi.Synchronization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,12 +27,12 @@ public class NsqConsumer extends DefaultConsumer {
     private ExecutorService executor;
     private boolean active;
     NSQConsumer consumer;
-    private final NsqConfiguration config;
+    private final NsqConfiguration configuration;
 
     public NsqConsumer(NsqEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.processor = processor;
-        this.config = getEndpoint().getNsqConfiguration();
+        this.configuration = getEndpoint().getNsqConfiguration();
     }
 
     @Override
@@ -47,14 +49,14 @@ public class NsqConsumer extends DefaultConsumer {
         LOG.debug("Getting NSQ Connection");
         NSQLookup lookup = new DefaultNSQLookup();
 
-        for(ServerAddress server : config.getServerAddresses()) {
+        for(ServerAddress server : configuration.getServerAddresses()) {
             lookup.addLookupAddress(server.getHost(),
-                    server.getPort() == 0 ? config.getLookupServerPort() : server.getPort());
+                    server.getPort() == 0 ? configuration.getLookupServerPort() : server.getPort());
         }
 
-        consumer = new NSQConsumer(lookup, config.getTopic(),
-                config.getChannel(), new CamelNsqMessageHandler());
-        consumer.setLookupPeriod(config.getLookupInterval());
+        consumer = new NSQConsumer(lookup, configuration.getTopic(),
+                configuration.getChannel(), new CamelNsqMessageHandler(), getEndpoint().getNsqConfig());
+        consumer.setLookupPeriod(configuration.getLookupInterval());
         consumer.setExecutor(getEndpoint().createExecutor());
         consumer.start();
     }
@@ -84,18 +86,45 @@ public class NsqConsumer extends DefaultConsumer {
             @Override
             public void message(NSQMessage msg) {
                 LOG.debug("Received Message: {}", msg);
-                Exchange exchange = getEndpoint().createExchange();
+                Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly);
                 exchange.getIn().setBody(msg.getMessage());
                 exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ID, msg.getId());
                 exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS, msg.getAttempts());
                 exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_TIMESTAMP, msg.getTimestamp());
                 try {
+                    if (configuration.getAutoFinish()) {
+                        msg.finished();
+                    } else {
+                        exchange.addOnCompletion(new NsqSynchronization(msg, (int) configuration.getRequeueInterval()));
+                    }
                     processor.process(exchange);
-                    msg.finished();
                 } catch (Exception e) {
-                    msg.requeue((int) config.getRequeueInterval());
+                    if (!configuration.getAutoFinish()) {
+                        msg.requeue((int) configuration.getRequeueInterval());
+                    }
                     getExceptionHandler().handleException("Error during processing", exchange, e);
                 }
             }
         }
+
+    class Sync implements Synchronization {
+
+        @Override
+        public void onComplete(final Exchange exchange) {
+            try {
+                //msg.finished();
+            } catch (Exception e) {
+                LOG.error(String.format("Could not run completion of exchange %s", exchange), e);
+            }
+        }
+
+        @Override
+        public void onFailure(final Exchange exchange) {
+            try {
+                //msg.requeue((int) config.getRequeueInterval());
+            } catch (Exception e) {
+                LOG.error(String.format("Could not run failure of exchange %s", exchange), e);
+            }
+        }
+    }
 }
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java
index e792316..53c04ac 100644
--- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java
@@ -50,8 +50,14 @@ public class NsqEndpoint extends DefaultEndpoint {
 
     public NSQConfig getNsqConfig() throws GeneralSecurityException, IOException {
         NSQConfig nsqConfig = new NSQConfig();
-        SslContext sslContext = new JdkSslContext(getNsqConfiguration().getSslContextParameters().createSSLContext(getCamelContext()), true, null);
-        nsqConfig.setSslContext(sslContext);
+        if (getNsqConfiguration().getSslContextParameters() != null) {
+            SslContext sslContext = new JdkSslContext(getNsqConfiguration().getSslContextParameters().createSSLContext(getCamelContext()), true, null);
+            nsqConfig.setSslContext(sslContext);
+
+        }
+        if (configuration.getMessageTimeout() > 0) {
+            nsqConfig.setMsgTimeout((int) configuration.getMessageTimeout());
+        }
 
         return nsqConfig;
     }
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqSynchronization.java b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqSynchronization.java
new file mode 100644
index 0000000..58635ee
--- /dev/null
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqSynchronization.java
@@ -0,0 +1,26 @@
+package org.apache.camel.component.nsq;
+
+import com.github.brainlag.nsq.NSQMessage;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.SynchronizationAdapter;
+
+public class NsqSynchronization extends SynchronizationAdapter {
+
+    private final NSQMessage nsqMessage;
+    private final int requeueInterval;
+
+    public NsqSynchronization(NSQMessage nsqMessage, int requeueInterval) {
+        this.nsqMessage = nsqMessage;
+        this.requeueInterval = requeueInterval;
+    }
+
+    @Override
+    public void onComplete(Exchange exchange) {
+        nsqMessage.finished();
+    }
+
+    @Override
+    public void onFailure(Exchange exchange) {
+        nsqMessage.requeue(requeueInterval);
+    }
+}
diff --git a/components/camel-nsq/src/test/java/org/apache/camel/component/nsq/NsqConsumerTest.java b/components/camel-nsq/src/test/java/org/apache/camel/component/nsq/NsqConsumerTest.java
index fca0ea3..7c15fdd 100644
--- a/components/camel-nsq/src/test/java/org/apache/camel/component/nsq/NsqConsumerTest.java
+++ b/components/camel-nsq/src/test/java/org/apache/camel/component/nsq/NsqConsumerTest.java
@@ -3,6 +3,8 @@ package org.apache.camel.component.nsq;
 import com.github.brainlag.nsq.NSQProducer;
 import com.github.brainlag.nsq.exceptions.NSQException;
 import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
@@ -12,6 +14,7 @@ import java.util.concurrent.TimeoutException;
 public class NsqConsumerTest extends NsqTestSupport {
 
     private static final int NUMBER_OF_MESSAGES = 10000;
+    private static final String TOPIC = "test";
 
     @EndpointInject(uri = "mock:result")
     protected MockEndpoint mockResultEndpoint;
@@ -25,7 +28,7 @@ public class NsqConsumerTest extends NsqTestSupport {
         producer.addAddress("localhost", 4150);
         producer.start();
 
-        producer.produce("test", ("Hello NSQ!").getBytes());
+        producer.produce(TOPIC, ("Hello NSQ!").getBytes());
 
         mockResultEndpoint.assertIsSatisfied();
 
@@ -42,18 +45,43 @@ public class NsqConsumerTest extends NsqTestSupport {
         producer.start();
 
         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
-            producer.produce("test", ("test" + i).getBytes());
+            producer.produce(TOPIC, (String.format("Hello NSQ%d!", i)).getBytes());
         }
 
         mockResultEndpoint.assertIsSatisfied();
     }
 
+    @Test
+    public void testRequeue() throws NSQException, TimeoutException, InterruptedException {
+        mockResultEndpoint.setExpectedMessageCount(1);
+        mockResultEndpoint.setAssertPeriod(5000);
+
+        NSQProducer producer = new NSQProducer();
+        producer.addAddress("localhost", 4150);
+        producer.start();
+
+        producer.produce(TOPIC, ("Test Requeue").getBytes());
+
+        mockResultEndpoint.assertIsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                fromF("nsq://%s?topic=%s&lookupInterval=5s", getNsqConsumerUrl(), "test").to(mockResultEndpoint);
+                fromF("nsq://%s?topic=%s&lookupInterval=2s&autoFinish=false&requeueInterval=1s", getNsqConsumerUrl(), TOPIC)
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                String messageText = exchange.getIn().getBody(String.class);
+                                int attempts = exchange.getIn().getHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS, Integer.class);
+                                if (messageText.contains("Requeue") && attempts < 3) {
+                                    throw new Exception();
+                                }
+                            }
+                        })
+                        .to(mockResultEndpoint);
             }
         };
     }