You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/07/19 08:40:35 UTC

[camel] 01/02: CAMEL-12664 - Camel-Nats Refactoring consumer a bit

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2c451b563628bd3469b9f15f64f046cb6788ba25
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Jul 19 10:33:21 2018 +0200

    CAMEL-12664 - Camel-Nats Refactoring consumer a bit
---
 .../apache/camel/component/nats/NatsConsumer.java  | 46 +++++++++-------------
 1 file changed, 18 insertions(+), 28 deletions(-)

diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 9bcc8c4..51d9593 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -134,21 +134,8 @@ public class NatsConsumer extends DefaultConsumer {
         @Override
         public void run() {
             try {
+                dispatcher = connection.createDispatcher(new CamelNatsMessageHandler());
                 if (ObjectHelper.isNotEmpty(configuration.getQueueName())) {
-                    dispatcher = connection.createDispatcher(new MessageHandler() {
-                        @Override
-                        public void onMessage(Message msg) {
-                            LOG.debug("Received Message: {}", msg);
-                            Exchange exchange = getEndpoint().createExchange();
-                            exchange.getIn().setBody(msg);
-                            exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
-                            try {
-                                processor.process(exchange);
-                            } catch (Exception e) {
-                                getExceptionHandler().handleException("Error during processing", exchange, e);
-                            }
-                        }
-                    });
                     dispatcher = dispatcher.subscribe(getEndpoint().getNatsConfiguration().getTopic(), getEndpoint().getNatsConfiguration().getQueueName());
                     if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
                         dispatcher.unsubscribe(getEndpoint().getNatsConfiguration().getTopic(), Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
@@ -157,20 +144,6 @@ public class NatsConsumer extends DefaultConsumer {
                         setActive(true);
                     }
                 } else {
-                    dispatcher = connection.createDispatcher(new MessageHandler() {
-                        @Override
-                        public void onMessage(Message msg) {
-                            LOG.debug("Received Message: {}", msg);
-                            Exchange exchange = getEndpoint().createExchange();
-                            exchange.getIn().setBody(msg);
-                            exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
-                            try {
-                                processor.process(exchange);
-                            } catch (Exception e) {
-                                getExceptionHandler().handleException("Error during processing", exchange, e);
-                            }
-                        }
-                    });
                     dispatcher = dispatcher.subscribe(getEndpoint().getNatsConfiguration().getTopic());
                     if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
                         dispatcher.unsubscribe(getEndpoint().getNatsConfiguration().getTopic(), Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
@@ -182,6 +155,23 @@ public class NatsConsumer extends DefaultConsumer {
             } catch (Throwable e) {
                 getExceptionHandler().handleException("Error during processing", e);
             }
+            
+        }
+        
+        class CamelNatsMessageHandler implements MessageHandler {
+
+            @Override
+            public void onMessage(Message msg) throws InterruptedException {
+                LOG.debug("Received Message: {}", msg);
+                Exchange exchange = getEndpoint().createExchange();
+                exchange.getIn().setBody(msg);
+                exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
+                try {
+                    processor.process(exchange);
+                } catch (Exception e) {
+                    getExceptionHandler().handleException("Error during processing", exchange, e);
+                }
+            }
         }
     }