You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/01/04 11:28:43 UTC

[GitHub] asfgit closed pull request #3245: NIFI-5921 - Timeout property for ConsumeJMS

asfgit closed pull request #3245: NIFI-5921 - Timeout property for ConsumeJMS
URL: https://github.com/apache/nifi/pull/3245
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 95a7103dea..0094eaf788 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -221,9 +221,6 @@ private T buildTargetResource(ProcessContext context) {
         jmsTemplate.setConnectionFactory(cachingFactory);
         jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
 
-        // set of properties that may be good candidates for exposure via configuration
-        jmsTemplate.setReceiveTimeout(1000);
-
         return finishBuildingJmsWorker(cachingFactory, jmsTemplate, context);
     }
 }
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 1f8358e23e..997c6dd2e4 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -134,7 +134,7 @@
             .description("How long to wait to consume a message from the remote broker before giving up.")
             .required(true)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .defaultValue("0 sec")
+            .defaultValue("1 sec")
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
@@ -187,9 +187,8 @@ protected void rendezvousWithJms(final ProcessContext context, final ProcessSess
         final boolean shared = sharedBoolean == null ? false : sharedBoolean;
         final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
         final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
-        final long timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
 
-        consumer.consume(destinationName, durable, shared, subscriptionName, charset, timeout, new ConsumerCallback() {
+        consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
             @Override
             public void accept(final JMSResponse response) {
                 if (response == null) {
@@ -220,6 +219,10 @@ public void accept(final JMSResponse response) {
     protected JMSConsumer finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) {
         int ackMode = processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger();
         jmsTemplate.setSessionAcknowledgeMode(ackMode);
+
+        long timeout = processContext.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        jmsTemplate.setReceiveTimeout(timeout);
+
         return new JMSConsumer(connectionFactory, jmsTemplate, this.getLogger());
     }
 
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index e6e5913569..a2c73b4e3b 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -80,7 +80,7 @@ private MessageConsumer createMessageConsumer(final Session session, final Strin
     }
 
 
-    public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset, final long timeout,
+    public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset,
                         final ConsumerCallback consumerCallback) {
         this.jmsTemplate.execute(new SessionCallback<Void>() {
             @Override
@@ -88,7 +88,7 @@ public Void doInJms(final Session session) throws JMSException {
 
                 final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriberName);
                 try {
-                    final Message message = msgConsumer.receive(timeout);
+                    final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
                     JMSResponse response = null;
 
                     if (message != null) {
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index eed9276d09..7812e7185a 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -113,7 +113,7 @@ public Message createMessage(Session session) throws JMSException {
             });
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-            consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     // noop
@@ -143,7 +143,7 @@ public Message createMessage(Session session) throws JMSException {
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
-            consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     callbackInvoked.set(true);
@@ -190,7 +190,7 @@ public void accept(JMSResponse response) {
                         JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
 
                         for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) {
-                            consumer.consume(destinationName, false, false, null, "UTF-8", 1000, callback);
+                            consumer.consume(destinationName, false, false, null, "UTF-8", callback);
                         }
                     } finally {
                         ((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
@@ -229,7 +229,7 @@ public void validateMessageRedeliveryWhenNotAcked() throws Exception {
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
             try {
-                consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
+                consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
                     @Override
                     public void accept(JMSResponse response) {
                         callbackInvoked.set(true);
@@ -246,7 +246,7 @@ public void accept(JMSResponse response) {
 
             // should receive the same message, but will process it successfully
             while (!callbackInvoked.get()) {
-                consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
+                consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
                     @Override
                     public void accept(JMSResponse response) {
                         if (response == null) {
@@ -265,7 +265,7 @@ public void accept(JMSResponse response) {
             // receiving next message and fail again
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
+                    consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
                         @Override
                         public void accept(JMSResponse response) {
                             if (response == null) {
@@ -287,7 +287,7 @@ public void accept(JMSResponse response) {
             // should receive the same message, but will process it successfully
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
+                    consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
                         @Override
                         public void accept(JMSResponse response) {
                             if (response == null) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services