You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/11/08 08:39:46 UTC
[camel] 04/11: camel-nsq component can automatically Finish
messages.
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit ccddc0762de135c3d638723b2149848c937a13e6
Author: mionker <mi...@icloud.com>
AuthorDate: Sun Nov 4 17:36:10 2018 +0100
camel-nsq component can automatically Finish messages.
---
.../camel/component/nsq/NsqConfiguration.java | 26 ++++++++++++
.../apache/camel/component/nsq/NsqConsumer.java | 49 +++++++++++++++++-----
.../apache/camel/component/nsq/NsqEndpoint.java | 10 ++++-
.../camel/component/nsq/NsqSynchronization.java | 26 ++++++++++++
.../camel/component/nsq/NsqConsumerTest.java | 34 +++++++++++++--
5 files changed, 130 insertions(+), 15 deletions(-)
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java
index 2c01f99..9cdd5c1 100644
--- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConfiguration.java
@@ -36,6 +36,10 @@ public class NsqConfiguration {
private long requeueInterval = 0;
@UriParam(label = "security")
private SSLContextParameters sslContextParameters;
+ @UriParam(label = "consumer", defaultValue = "true", description = "Automatically finish the NSQ Message when it is retrievd from the queue and before the Exchange is processed.")
+ private Boolean autoFinish = true;
+ @UriParam(label = "consumer", description = "The NSQ consumer timeout period for messages retrieved from the queue.")
+ private long messageTimeout;
/*
* URL a NSQ lookup server hostname.
@@ -161,6 +165,28 @@ public class NsqConfiguration {
this.sslContextParameters = sslContextParameters;
}
+ /**
+ * Automatically finish the NSQ message when it is retrieved from the quese and before the Exchange is processed.
+ */
+ public Boolean getAutoFinish() {
+ return autoFinish;
+ }
+
+ public void setAutoFinish(Boolean autoFinish) {
+ this.autoFinish = autoFinish;
+ }
+
+ /**
+ * The NSQ message timeout for a consumer.
+ */
+ public long getMessageTimeout() {
+ return messageTimeout;
+ }
+
+ public void setMessageTimeout(long messageTimeout) {
+ this.messageTimeout = messageTimeout;
+ }
+
private String splitServers() {
StringBuilder servers = new StringBuilder();
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
index 6402fe5..68faf37 100644
--- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
@@ -7,8 +7,10 @@ import com.github.brainlag.nsq.callbacks.NSQMessageCallback;
import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
import com.github.brainlag.nsq.lookup.NSQLookup;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.spi.Synchronization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,12 +27,12 @@ public class NsqConsumer extends DefaultConsumer {
private ExecutorService executor;
private boolean active;
NSQConsumer consumer;
- private final NsqConfiguration config;
+ private final NsqConfiguration configuration;
public NsqConsumer(NsqEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.processor = processor;
- this.config = getEndpoint().getNsqConfiguration();
+ this.configuration = getEndpoint().getNsqConfiguration();
}
@Override
@@ -47,14 +49,14 @@ public class NsqConsumer extends DefaultConsumer {
LOG.debug("Getting NSQ Connection");
NSQLookup lookup = new DefaultNSQLookup();
- for(ServerAddress server : config.getServerAddresses()) {
+ for(ServerAddress server : configuration.getServerAddresses()) {
lookup.addLookupAddress(server.getHost(),
- server.getPort() == 0 ? config.getLookupServerPort() : server.getPort());
+ server.getPort() == 0 ? configuration.getLookupServerPort() : server.getPort());
}
- consumer = new NSQConsumer(lookup, config.getTopic(),
- config.getChannel(), new CamelNsqMessageHandler());
- consumer.setLookupPeriod(config.getLookupInterval());
+ consumer = new NSQConsumer(lookup, configuration.getTopic(),
+ configuration.getChannel(), new CamelNsqMessageHandler(), getEndpoint().getNsqConfig());
+ consumer.setLookupPeriod(configuration.getLookupInterval());
consumer.setExecutor(getEndpoint().createExecutor());
consumer.start();
}
@@ -84,18 +86,45 @@ public class NsqConsumer extends DefaultConsumer {
@Override
public void message(NSQMessage msg) {
LOG.debug("Received Message: {}", msg);
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly);
exchange.getIn().setBody(msg.getMessage());
exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ID, msg.getId());
exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS, msg.getAttempts());
exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_TIMESTAMP, msg.getTimestamp());
try {
+ if (configuration.getAutoFinish()) {
+ msg.finished();
+ } else {
+ exchange.addOnCompletion(new NsqSynchronization(msg, (int) configuration.getRequeueInterval()));
+ }
processor.process(exchange);
- msg.finished();
} catch (Exception e) {
- msg.requeue((int) config.getRequeueInterval());
+ if (!configuration.getAutoFinish()) {
+ msg.requeue((int) configuration.getRequeueInterval());
+ }
getExceptionHandler().handleException("Error during processing", exchange, e);
}
}
}
+
+ class Sync implements Synchronization {
+
+ @Override
+ public void onComplete(final Exchange exchange) {
+ try {
+ //msg.finished();
+ } catch (Exception e) {
+ LOG.error(String.format("Could not run completion of exchange %s", exchange), e);
+ }
+ }
+
+ @Override
+ public void onFailure(final Exchange exchange) {
+ try {
+ //msg.requeue((int) config.getRequeueInterval());
+ } catch (Exception e) {
+ LOG.error(String.format("Could not run failure of exchange %s", exchange), e);
+ }
+ }
+ }
}
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java
index e792316..53c04ac 100644
--- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqEndpoint.java
@@ -50,8 +50,14 @@ public class NsqEndpoint extends DefaultEndpoint {
public NSQConfig getNsqConfig() throws GeneralSecurityException, IOException {
NSQConfig nsqConfig = new NSQConfig();
- SslContext sslContext = new JdkSslContext(getNsqConfiguration().getSslContextParameters().createSSLContext(getCamelContext()), true, null);
- nsqConfig.setSslContext(sslContext);
+ if (getNsqConfiguration().getSslContextParameters() != null) {
+ SslContext sslContext = new JdkSslContext(getNsqConfiguration().getSslContextParameters().createSSLContext(getCamelContext()), true, null);
+ nsqConfig.setSslContext(sslContext);
+
+ }
+ if (configuration.getMessageTimeout() > 0) {
+ nsqConfig.setMsgTimeout((int) configuration.getMessageTimeout());
+ }
return nsqConfig;
}
diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqSynchronization.java b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqSynchronization.java
new file mode 100644
index 0000000..58635ee
--- /dev/null
+++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqSynchronization.java
@@ -0,0 +1,26 @@
+package org.apache.camel.component.nsq;
+
+import com.github.brainlag.nsq.NSQMessage;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.SynchronizationAdapter;
+
+public class NsqSynchronization extends SynchronizationAdapter {
+
+ private final NSQMessage nsqMessage;
+ private final int requeueInterval;
+
+ public NsqSynchronization(NSQMessage nsqMessage, int requeueInterval) {
+ this.nsqMessage = nsqMessage;
+ this.requeueInterval = requeueInterval;
+ }
+
+ @Override
+ public void onComplete(Exchange exchange) {
+ nsqMessage.finished();
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ nsqMessage.requeue(requeueInterval);
+ }
+}
diff --git a/components/camel-nsq/src/test/java/org/apache/camel/component/nsq/NsqConsumerTest.java b/components/camel-nsq/src/test/java/org/apache/camel/component/nsq/NsqConsumerTest.java
index fca0ea3..7c15fdd 100644
--- a/components/camel-nsq/src/test/java/org/apache/camel/component/nsq/NsqConsumerTest.java
+++ b/components/camel-nsq/src/test/java/org/apache/camel/component/nsq/NsqConsumerTest.java
@@ -3,6 +3,8 @@ package org.apache.camel.component.nsq;
import com.github.brainlag.nsq.NSQProducer;
import com.github.brainlag.nsq.exceptions.NSQException;
import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
@@ -12,6 +14,7 @@ import java.util.concurrent.TimeoutException;
public class NsqConsumerTest extends NsqTestSupport {
private static final int NUMBER_OF_MESSAGES = 10000;
+ private static final String TOPIC = "test";
@EndpointInject(uri = "mock:result")
protected MockEndpoint mockResultEndpoint;
@@ -25,7 +28,7 @@ public class NsqConsumerTest extends NsqTestSupport {
producer.addAddress("localhost", 4150);
producer.start();
- producer.produce("test", ("Hello NSQ!").getBytes());
+ producer.produce(TOPIC, ("Hello NSQ!").getBytes());
mockResultEndpoint.assertIsSatisfied();
@@ -42,18 +45,43 @@ public class NsqConsumerTest extends NsqTestSupport {
producer.start();
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
- producer.produce("test", ("test" + i).getBytes());
+ producer.produce(TOPIC, (String.format("Hello NSQ%d!", i)).getBytes());
}
mockResultEndpoint.assertIsSatisfied();
}
+ @Test
+ public void testRequeue() throws NSQException, TimeoutException, InterruptedException {
+ mockResultEndpoint.setExpectedMessageCount(1);
+ mockResultEndpoint.setAssertPeriod(5000);
+
+ NSQProducer producer = new NSQProducer();
+ producer.addAddress("localhost", 4150);
+ producer.start();
+
+ producer.produce(TOPIC, ("Test Requeue").getBytes());
+
+ mockResultEndpoint.assertIsSatisfied();
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- fromF("nsq://%s?topic=%s&lookupInterval=5s", getNsqConsumerUrl(), "test").to(mockResultEndpoint);
+ fromF("nsq://%s?topic=%s&lookupInterval=2s&autoFinish=false&requeueInterval=1s", getNsqConsumerUrl(), TOPIC)
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String messageText = exchange.getIn().getBody(String.class);
+ int attempts = exchange.getIn().getHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS, Integer.class);
+ if (messageText.contains("Requeue") && attempts < 3) {
+ throw new Exception();
+ }
+ }
+ })
+ .to(mockResultEndpoint);
}
};
}