You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/01/04 09:36:48 UTC

[GitHub] pvillard31 closed pull request #3240: NIFI-5921: Added property to allow a user to define a timeout on the ConsumeJMS …

pvillard31 closed pull request #3240: NIFI-5921: Added property to allow a user to define a timeout on the ConsumeJMS …
URL: https://github.com/apache/nifi/pull/3240
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index ff05f6a922..1f8358e23e 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -48,6 +48,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Consuming JMS processor which upon each invocation of
@@ -128,6 +129,14 @@
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
+    static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Timeout")
+            .description("How long to wait to consume a message from the remote broker before giving up.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 sec")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -154,6 +163,7 @@
         _propertyDescriptors.add(DURABLE_SUBSCRIBER);
         _propertyDescriptors.add(SHARED_SUBSCRIBER);
         _propertyDescriptors.add(SUBSCRIPTION_NAME);
+        _propertyDescriptors.add(TIMEOUT);
         thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
 
         Set<Relationship> _relationships = new HashSet<>();
@@ -177,8 +187,9 @@ protected void rendezvousWithJms(final ProcessContext context, final ProcessSess
         final boolean shared = sharedBoolean == null ? false : sharedBoolean;
         final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
         final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+        final long timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
 
-        consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
+        consumer.consume(destinationName, durable, shared, subscriptionName, charset, timeout, new ConsumerCallback() {
             @Override
             public void accept(final JMSResponse response) {
                 if (response == null) {
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index a2c73b4e3b..e6e5913569 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -80,7 +80,7 @@ private MessageConsumer createMessageConsumer(final Session session, final Strin
     }
 
 
-    public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset,
+    public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset, final long timeout,
                         final ConsumerCallback consumerCallback) {
         this.jmsTemplate.execute(new SessionCallback<Void>() {
             @Override
@@ -88,7 +88,7 @@ public Void doInJms(final Session session) throws JMSException {
 
                 final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriberName);
                 try {
-                    final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
+                    final Message message = msgConsumer.receive(timeout);
                     JMSResponse response = null;
 
                     if (message != null) {
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index 7812e7185a..eed9276d09 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -113,7 +113,7 @@ public Message createMessage(Session session) throws JMSException {
             });
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-            consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     // noop
@@ -143,7 +143,7 @@ public Message createMessage(Session session) throws JMSException {
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
-            consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     callbackInvoked.set(true);
@@ -190,7 +190,7 @@ public void accept(JMSResponse response) {
                         JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
 
                         for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) {
-                            consumer.consume(destinationName, false, false, null, "UTF-8", callback);
+                            consumer.consume(destinationName, false, false, null, "UTF-8", 1000, callback);
                         }
                     } finally {
                         ((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
@@ -229,7 +229,7 @@ public void validateMessageRedeliveryWhenNotAcked() throws Exception {
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
             try {
-                consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+                consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                     @Override
                     public void accept(JMSResponse response) {
                         callbackInvoked.set(true);
@@ -246,7 +246,7 @@ public void accept(JMSResponse response) {
 
             // should receive the same message, but will process it successfully
             while (!callbackInvoked.get()) {
-                consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+                consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                     @Override
                     public void accept(JMSResponse response) {
                         if (response == null) {
@@ -265,7 +265,7 @@ public void accept(JMSResponse response) {
             // receiving next message and fail again
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+                    consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                         @Override
                         public void accept(JMSResponse response) {
                             if (response == null) {
@@ -287,7 +287,7 @@ public void accept(JMSResponse response) {
             // should receive the same message, but will process it successfully
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+                    consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                         @Override
                         public void accept(JMSResponse response) {
                             if (response == null) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services